From ee1b0f1cfa1dbc85aedfce0f9140525e1cfe2293 Mon Sep 17 00:00:00 2001 From: Josako Date: Wed, 25 Sep 2024 15:39:25 +0200 Subject: [PATCH] Start log tracing to log business events. Storage in both database and logging-backend. --- common/models/monitoring.py | 33 +- common/utils/business_event.py | 109 +++++ common/utils/business_event_context.py | 25 ++ config/logging_config.py | 20 +- eveai_workers/Processors/html_processor.py | 1 + eveai_workers/tasks.py | 387 ++++-------------- ...2cbdb23ae02e_corrected_businesseventlog.py | 49 +++ .../e3c6ff8c22df_updated_monitoring_setup.py | 67 +++ 8 files changed, 370 insertions(+), 321 deletions(-) create mode 100644 common/utils/business_event.py create mode 100644 common/utils/business_event_context.py create mode 100644 migrations/public/versions/2cbdb23ae02e_corrected_businesseventlog.py create mode 100644 migrations/public/versions/e3c6ff8c22df_updated_monitoring_setup.py diff --git a/common/models/monitoring.py b/common/models/monitoring.py index de4045f..2655e72 100644 --- a/common/models/monitoring.py +++ b/common/models/monitoring.py @@ -1,28 +1,21 @@ from common.extensions import db -from sqlalchemy.dialects.postgresql import JSONB -import sqlalchemy as sa -class LLMUsageMetric(db.Model): +class BusinessEventLog(db.Model): __bind_key__ = 'public' __table_args__ = {'schema': 'public'} id = db.Column(db.Integer, primary_key=True) - tenant_id = db.Column(db.Integer, nullable=False) - environment = db.Column(db.String(20), nullable=False) - activity = db.Column(db.String(20), nullable=False) - sub_activity = db.Column(db.String(20), nullable=False) - activity_detail = db.Column(db.String(50), nullable=True) - session_id = db.Column(db.String(50), nullable=True) # Chat Session ID - interaction_id = db.Column(db.Integer, nullable=True) # Chat Interaction ID - document_version_id = db.Column(db.Integer, nullable=True) - prompt_tokens = db.Column(db.Integer, nullable=True) - completion_tokens = db.Column(db.Integer, nullable=True) - total_tokens = db.Column(db.Integer, nullable=True) - cost = db.Column(db.Float, nullable=True) - latency = db.Column(db.Float, nullable=True) - model_name = db.Column(db.String(50), nullable=False) timestamp = db.Column(db.DateTime, nullable=False) - additional_info = db.Column(JSONB, nullable=True) - - # Add any additional fields or methods as needed + event_type = db.Column(db.String(50), nullable=False) + tenant_id = db.Column(db.Integer, nullable=False) + trace_id = db.Column(db.String(50), nullable=False) + span_id = db.Column(db.String(50)) + span_name = db.Column(db.String(50)) + parent_span_id = db.Column(db.String(50)) + document_version_id = db.Column(db.Integer) + chat_session_id = db.Column(db.Integer) + interaction_id = db.Column(db.Integer) + environment = db.Column(db.String(20)) + message = db.Column(db.Text) + # Add any other fields relevant for invoicing or warnings \ No newline at end of file diff --git a/common/utils/business_event.py b/common/utils/business_event.py new file mode 100644 index 0000000..0fbef46 --- /dev/null +++ b/common/utils/business_event.py @@ -0,0 +1,109 @@ +import os +import uuid +from contextlib import contextmanager +from datetime import datetime +from typing import Dict, Any, Optional +from datetime import datetime as dt, timezone as tz +from portkey_ai import Portkey, Config +import logging + +from .business_event_context import BusinessEventContext +from common.models.monitoring import BusinessEventLog +from common.extensions import db + + +class BusinessEvent: + # The BusinessEvent class itself is a context manager, but it doesn't use the @contextmanager decorator. + # Instead, it defines __enter__ and __exit__ methods explicitly. This is because we're doing something a bit more + # complex - we're interacting with the BusinessEventContext and the _business_event_stack. + + def __init__(self, event_type: str, tenant_id: int, **kwargs): + self.event_type = event_type + self.tenant_id = tenant_id + self.trace_id = str(uuid.uuid4()) + self.span_id = None + self.span_name = None + self.parent_span_id = None + self.document_version_id = kwargs.get('document_version_id') + self.chat_session_id = kwargs.get('chat_session_id') + self.interaction_id = kwargs.get('interaction_id') + self.environment = os.environ.get("FLASK_ENV", "development") + self.span_counter = 0 + self.spans = [] + + def update_attribute(self, attribute: str, value: any): + if hasattr(self, attribute): + setattr(self, attribute, value) + else: + raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attribute}'") + + @contextmanager + def create_span(self, span_name: str): + # The create_span method is designed to be used as a context manager. We want to perform some actions when + # entering the span (like setting the span ID and name) and some actions when exiting the span (like removing + # these temporary attributes). The @contextmanager decorator allows us to write this method in a way that + # clearly separates the "entry" and "exit" logic, with the yield statement in between. + + parent_span_id = self.span_id + self.span_counter += 1 + new_span_id = f"{self.trace_id}-{self.span_counter}" + + # Save the current span info + self.spans.append((self.span_id, self.span_name, self.parent_span_id)) + + # Set the new span info + self.span_id = new_span_id + self.span_name = span_name + self.parent_span_id = parent_span_id + + try: + yield + finally: + # Restore the previous span info + if self.spans: + self.span_id, self.span_name, self.parent_span_id = self.spans.pop() + else: + self.span_id = None + self.span_name = None + self.parent_span_id = None + + def log(self, message: str, level: str = 'info'): + logger = logging.getLogger('business_events') + log_data = { + 'event_type': self.event_type, + 'tenant_id': self.tenant_id, + 'trace_id': self.trace_id, + 'span_id': self.span_id, + 'span_name': self.span_name, + 'parent_span_id': self.parent_span_id, + 'document_version_id': self.document_version_id, + 'chat_session_id': self.chat_session_id, + 'interaction_id': self.interaction_id, + 'environment': self.environment + } + # log to Graylog + getattr(logger, level)(message, extra=log_data) + + # Log to database + event_log = BusinessEventLog( + timestamp=dt.now(tz=tz.utc), + event_type=self.event_type, + tenant_id=self.tenant_id, + trace_id=self.trace_id, + span_id=self.span_id, + span_name=self.span_name, + parent_span_id=self.parent_span_id, + document_version_id=self.document_version_id, + chat_session_id=self.chat_session_id, + interaction_id=self.interaction_id, + environment=self.environment, + message=message + ) + db.session.add(event_log) + db.session.commit() + + def __enter__(self): + return BusinessEventContext(self).__enter__() + + def __exit__(self, exc_type, exc_val, exc_tb): + return BusinessEventContext(self).__exit__(exc_type, exc_val, exc_tb) diff --git a/common/utils/business_event_context.py b/common/utils/business_event_context.py new file mode 100644 index 0000000..42a85e2 --- /dev/null +++ b/common/utils/business_event_context.py @@ -0,0 +1,25 @@ +from werkzeug.local import LocalProxy, LocalStack + +_business_event_stack = LocalStack() + + +def _get_current_event(): + top = _business_event_stack.top + if top is None: + raise RuntimeError("No business event context found. Are you sure you're in a business event?") + return top + + +current_event = LocalProxy(_get_current_event) + + +class BusinessEventContext: + def __init__(self, event): + self.event = event + + def __enter__(self): + _business_event_stack.push(self.event) + return self.event + + def __exit__(self, exc_type, exc_val, exc_tb): + _business_event_stack.pop() diff --git a/config/logging_config.py b/config/logging_config.py index e767138..7bffdeb 100644 --- a/config/logging_config.py +++ b/config/logging_config.py @@ -12,7 +12,12 @@ env = os.environ.get('FLASK_ENV', 'development') class CustomLogRecord(logging.LogRecord): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.component = os.environ.get('COMPONENT_NAME', 'eveai_app') # Set default component value here + self.component = os.environ.get('COMPONENT_NAME', 'eveai_app') + + def __setattr__(self, name, value): + if name not in {'event_type', 'tenant_id', 'trace_id', 'span_id', 'span_name', 'parent_span_id', + 'document_version_id', 'chat_session_id', 'interaction_id', 'environment'}: + super().__setattr__(name, value) def custom_log_record_factory(*args, **kwargs): @@ -108,6 +113,14 @@ LOGGING = { 'backupCount': 10, 'formatter': 'standard', }, + 'file_business_events': { + 'level': 'INFO', + 'class': 'logging.handlers.RotatingFileHandler', + 'filename': 'logs/business_events.log', + 'maxBytes': 1024 * 1024 * 5, # 5MB + 'backupCount': 10, + 'formatter': 'standard', + }, 'console': { 'class': 'logging.StreamHandler', 'level': 'DEBUG', @@ -184,6 +197,11 @@ LOGGING = { 'level': 'DEBUG', 'propagate': False }, + 'business_events': { + 'handlers': ['file_business_events', 'graylog'], + 'level': 'DEBUG', + 'propagate': False + }, '': { # root logger 'handlers': ['console'], 'level': 'WARNING', # Set higher level for root to minimize noise diff --git a/eveai_workers/Processors/html_processor.py b/eveai_workers/Processors/html_processor.py index fbb7082..acc307c 100644 --- a/eveai_workers/Processors/html_processor.py +++ b/eveai_workers/Processors/html_processor.py @@ -5,6 +5,7 @@ from langchain_core.runnables import RunnablePassthrough from common.extensions import db, minio_client from common.utils.model_utils import create_language_template from .processor import Processor +from common.utils.business_event_context import current_event class HTMLProcessor(Processor): diff --git a/eveai_workers/tasks.py b/eveai_workers/tasks.py index 8220f8f..cd86fc2 100644 --- a/eveai_workers/tasks.py +++ b/eveai_workers/tasks.py @@ -24,6 +24,9 @@ from eveai_workers.Processors.html_processor import HTMLProcessor from eveai_workers.Processors.pdf_processor import PDFProcessor from eveai_workers.Processors.srt_processor import SRTProcessor +from common.utils.business_event import BusinessEvent +from common.utils.business_event_context import current_event + # Healthcheck task @current_celery.task(name='ping', queue='embeddings') @@ -33,76 +36,80 @@ def ping(): @current_celery.task(name='create_embeddings', queue='embeddings') def create_embeddings(tenant_id, document_version_id): - current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}.') + # BusinessEvent creates a context, which is why we need to use it with a with block + with BusinessEvent('Create Embeddings', tenant_id, document_version_id=document_version_id): + current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}') + current_event.log("Starting Embedding Creation Task") - try: - # Retrieve Tenant for which we are processing - tenant = Tenant.query.get(tenant_id) - if tenant is None: - raise Exception(f'Tenant {tenant_id} not found') + try: + # Retrieve Tenant for which we are processing + tenant = Tenant.query.get(tenant_id) + if tenant is None: + raise Exception(f'Tenant {tenant_id} not found') - # Ensure we are working in the correct database schema - Database(tenant_id).switch_schema() + # Ensure we are working in the correct database schema + Database(tenant_id).switch_schema() - # Select variables to work with depending on tenant and model - model_variables = select_model_variables(tenant) - current_app.logger.debug(f'Model variables: {model_variables}') + # Select variables to work with depending on tenant and model + model_variables = select_model_variables(tenant) + current_app.logger.debug(f'Model variables: {model_variables}') - # Retrieve document version to process - document_version = DocumentVersion.query.get(document_version_id) - if document_version is None: - raise Exception(f'Document version {document_version_id} not found') + # Retrieve document version to process + document_version = DocumentVersion.query.get(document_version_id) + if document_version is None: + raise Exception(f'Document version {document_version_id} not found') - except Exception as e: - current_app.logger.error(f'Create Embeddings request received ' - f'for non existing document version {document_version_id} ' - f'for tenant {tenant_id}, ' - f'error: {e}') - raise + except Exception as e: + current_app.logger.error(f'Create Embeddings request received ' + f'for non existing document version {document_version_id} ' + f'for tenant {tenant_id}, ' + f'error: {e}') + raise - try: - db.session.add(document_version) + try: + db.session.add(document_version) - # start processing - document_version.processing = True - document_version.processing_started_at = dt.now(tz.utc) - document_version.processing_finished_at = None - document_version.processing_error = None + # start processing + document_version.processing = True + document_version.processing_started_at = dt.now(tz.utc) + document_version.processing_finished_at = None + document_version.processing_error = None - db.session.commit() - except SQLAlchemyError as e: - current_app.logger.error(f'Unable to save Embedding status information ' - f'in document version {document_version_id} ' - f'for tenant {tenant_id}') - raise + db.session.commit() + except SQLAlchemyError as e: + current_app.logger.error(f'Unable to save Embedding status information ' + f'in document version {document_version_id} ' + f'for tenant {tenant_id}') + raise - delete_embeddings_for_document_version(document_version) + delete_embeddings_for_document_version(document_version) - try: - match document_version.file_type: - case 'pdf': - process_pdf(tenant, model_variables, document_version) - case 'html': - process_html(tenant, model_variables, document_version) - case 'srt': - process_srt(tenant, model_variables, document_version) - case 'mp4' | 'mp3' | 'ogg': - process_audio(tenant, model_variables, document_version) - case _: - raise Exception(f'No functionality defined for file type {document_version.file_type} ' - f'for tenant {tenant_id} ' - f'while creating embeddings for document version {document_version_id}') + try: + match document_version.file_type: + case 'pdf': + process_pdf(tenant, model_variables, document_version) + case 'html': + process_html(tenant, model_variables, document_version) + case 'srt': + process_srt(tenant, model_variables, document_version) + case 'mp4' | 'mp3' | 'ogg': + process_audio(tenant, model_variables, document_version) + case _: + raise Exception(f'No functionality defined for file type {document_version.file_type} ' + f'for tenant {tenant_id} ' + f'while creating embeddings for document version {document_version_id}') + current_event.log("Finished Embedding Creation Task") - except Exception as e: - current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} ' - f'on document version {document_version_id} ' - f'error: {e}') - document_version.processing = False - document_version.processing_finished_at = dt.now(tz.utc) - document_version.processing_error = str(e)[:255] - db.session.commit() - create_embeddings.update_state(state=states.FAILURE) - raise + except Exception as e: + current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} ' + f'on document version {document_version_id} ' + f'error: {e}') + document_version.processing = False + document_version.processing_finished_at = dt.now(tz.utc) + document_version.processing_error = str(e)[:255] + db.session.commit() + create_embeddings.update_state(state=states.FAILURE) + raise def delete_embeddings_for_document_version(document_version): @@ -118,38 +125,48 @@ def delete_embeddings_for_document_version(document_version): def process_pdf(tenant, model_variables, document_version): + current_event.log("Starting PDF Processing") processor = PDFProcessor(tenant, model_variables, document_version) markdown, title = processor.process() # Process markdown and embed embed_markdown(tenant, model_variables, document_version, markdown, title) + current_event.log("Finished PDF Processing") def process_html(tenant, model_variables, document_version): - processor = HTMLProcessor(tenant, model_variables, document_version) - markdown, title = processor.process() + with current_event.create_span("HTML Processing"): + processor = HTMLProcessor(tenant, model_variables, document_version) + markdown, title = processor.process() # Process markdown and embed - embed_markdown(tenant, model_variables, document_version, markdown, title) + with current_event.create_span("Embedding"): + embed_markdown(tenant, model_variables, document_version, markdown, title) + def process_audio(tenant, model_variables, document_version): + current_event.log("Starting Audio Processing") processor = AudioProcessor(tenant, model_variables, document_version) markdown, title = processor.process() # Process markdown and embed embed_markdown(tenant, model_variables, document_version, markdown, title) + current_event.log("Finished Audio Processing") def process_srt(tenant, model_variables, document_version): + current_event.log("Starting SRT Processing") processor = SRTProcessor(tenant, model_variables, document_version) markdown, title = processor.process() # Process markdown and embed embed_markdown(tenant, model_variables, document_version, markdown, title) + current_event.log("Finished SRT Processing") def embed_markdown(tenant, model_variables, document_version, markdown, title): + current_event.log("Starting Embedding Markdown Processing") # Create potential chunks potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md") @@ -178,9 +195,11 @@ def embed_markdown(tenant, model_variables, document_version, markdown, title): current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' f'on document version {document_version.id} :-)') + current_event.log("Finished Embedding Markdown Processing") def enrich_chunks(tenant, model_variables, document_version, title, chunks): + current_event.log("Starting Enriching Chunks Processing") current_app.logger.debug(f'Enriching chunks for tenant {tenant.id} ' f'on document version {document_version.id}') @@ -213,11 +232,13 @@ def enrich_chunks(tenant, model_variables, document_version, title, chunks): current_app.logger.debug(f'Finished enriching chunks for tenant {tenant.id} ' f'on document version {document_version.id}') + current_event.log("Finished Enriching Chunks Processing") return enriched_chunks def summarize_chunk(tenant, model_variables, document_version, chunk): + current_event.log("Starting Summarizing Chunk Processing") current_app.logger.debug(f'Summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}') llm = model_variables['llm'] @@ -235,6 +256,7 @@ def summarize_chunk(tenant, model_variables, document_version, chunk): summary = chain.invoke({"text": chunk}) current_app.logger.debug(f'Finished summarizing chunk for tenant {tenant.id} ' f'on document version {document_version.id}.') + current_event.log("Finished summarizing chunk for tenant ") return summary except LangChainException as e: current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} ' @@ -244,6 +266,7 @@ def summarize_chunk(tenant, model_variables, document_version, chunk): def embed_chunks(tenant, model_variables, document_version, chunks): + current_event.log("Starting Embedding Chunks Processing") current_app.logger.debug(f'Embedding chunks for tenant {tenant.id} ' f'on document version {document_version.id}') embedding_model = model_variables['embedding_model'] @@ -268,6 +291,8 @@ def embed_chunks(tenant, model_variables, document_version, chunks): new_embedding.embedding = embedding new_embeddings.append(new_embedding) + current_app.logger.debug(f'Finished embedding chunks for tenant {tenant.id} ') + return new_embeddings @@ -281,244 +306,6 @@ def log_parsing_info(tenant, tags, included_elements, excluded_elements, exclude current_app.embed_tuning_logger.debug(f'First element to parse: {elements_to_parse[0]}') -# def process_youtube(tenant, model_variables, document_version): -# download_file_name = f'{document_version.id}.mp4' -# compressed_file_name = f'{document_version.id}.mp3' -# transcription_file_name = f'{document_version.id}.txt' -# markdown_file_name = f'{document_version.id}.md' -# -# # Remove existing files (in case of a re-processing of the file -# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, -# document_version.id, download_file_name) -# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, -# document_version.id, compressed_file_name) -# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, -# document_version.id, transcription_file_name) -# minio_client.delete_document_file(tenant.id, document_version.doc_id, document_version.language, -# document_version.id, markdown_file_name) -# -# of, title, description, author = download_youtube(document_version.url, tenant.id, document_version, -# download_file_name) -# document_version.system_context = f'Title: {title}\nDescription: {description}\nAuthor: {author}' -# compress_audio(tenant.id, document_version, download_file_name, compressed_file_name) -# transcribe_audio(tenant.id, document_version, compressed_file_name, transcription_file_name, model_variables) -# annotate_transcription(tenant, document_version, transcription_file_name, markdown_file_name, model_variables) -# -# potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name) -# actual_chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'], -# model_variables['max_chunk_size']) -# -# enriched_chunks = enrich_chunks(tenant, document_version, actual_chunks) -# embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks) -# -# try: -# db.session.add(document_version) -# document_version.processing_finished_at = dt.now(tz.utc) -# document_version.processing = False -# db.session.add_all(embeddings) -# db.session.commit() -# except SQLAlchemyError as e: -# current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} ' -# f'on Youtube document version {document_version.id}' -# f'error: {e}') -# raise -# -# current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} ' -# f'on Youtube document version {document_version.id} :-)') -# -# -# def download_youtube(url, tenant_id, document_version, file_name): -# try: -# current_app.logger.info(f'Downloading YouTube video: {url} for tenant: {tenant_id}') -# yt = YouTube(url) -# stream = yt.streams.get_audio_only() -# -# with tempfile.NamedTemporaryFile(delete=False) as temp_file: -# stream.download(output_path=temp_file.name) -# with open(temp_file.name, 'rb') as f: -# file_data = f.read() -# -# minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language, -# document_version.id, -# file_name, file_data) -# -# current_app.logger.info(f'Downloaded YouTube video: {url} for tenant: {tenant_id}') -# return file_name, yt.title, yt.description, yt.author -# except Exception as e: -# current_app.logger.error(f'Error downloading YouTube video: {url} for tenant: {tenant_id} with error: {e}') -# raise -# -# -# def compress_audio(tenant_id, document_version, input_file, output_file): -# try: -# current_app.logger.info(f'Compressing audio for tenant: {tenant_id}') -# -# input_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, -# document_version.id, input_file) -# -# with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as temp_input: -# temp_input.write(input_data) -# temp_input.flush() -# -# with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_output: -# result = subprocess.run( -# ['ffmpeg', '-i', temp_input.name, '-b:a', '64k', '-f', 'mp3', temp_output.name], -# capture_output=True, -# text=True -# ) -# -# if result.returncode != 0: -# raise Exception(f"Compression failed: {result.stderr}") -# -# with open(temp_output.name, 'rb') as f: -# compressed_data = f.read() -# -# minio_client.upload_document_file(tenant_id, document_version.doc_id, document_version.language, -# document_version.id, -# output_file, compressed_data) -# -# current_app.logger.info(f'Compressed audio for tenant: {tenant_id}') -# except Exception as e: -# current_app.logger.error(f'Error compressing audio for tenant: {tenant_id} with error: {e}') -# raise -# -# -# def transcribe_audio(tenant_id, document_version, input_file, output_file, model_variables): -# try: -# current_app.logger.info(f'Transcribing audio for tenant: {tenant_id}') -# client = model_variables['transcription_client'] -# model = model_variables['transcription_model'] -# -# # Download the audio file from MinIO -# audio_data = minio_client.download_document_file(tenant_id, document_version.doc_id, document_version.language, -# document_version.id, input_file) -# -# # Load the audio data into pydub -# audio = AudioSegment.from_mp3(io.BytesIO(audio_data)) -# -# # Define segment length (e.g., 10 minutes) -# segment_length = 10 * 60 * 1000 # 10 minutes in milliseconds -# -# transcriptions = [] -# -# # Split audio into segments and transcribe each -# for i, chunk in enumerate(audio[::segment_length]): -# current_app.logger.debug(f'Transcribing chunk {i + 1} of {len(audio) // segment_length + 1}') -# -# with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio: -# chunk.export(temp_audio.name, format="mp3") -# -# with open(temp_audio.name, 'rb') as audio_segment: -# transcription = client.audio.transcriptions.create( -# file=audio_segment, -# model=model, -# language=document_version.language, -# response_format='verbose_json', -# ) -# -# transcriptions.append(transcription.text) -# -# os.unlink(temp_audio.name) # Delete the temporary file -# -# # Combine all transcriptions -# full_transcription = " ".join(transcriptions) -# -# # Upload the full transcription to MinIO -# minio_client.upload_document_file( -# tenant_id, -# document_version.doc_id, -# document_version.language, -# document_version.id, -# output_file, -# full_transcription.encode('utf-8') -# ) -# -# current_app.logger.info(f'Transcribed audio for tenant: {tenant_id}') -# except Exception as e: -# current_app.logger.error(f'Error transcribing audio for tenant: {tenant_id}, with error: {e}') -# raise -# -# -# def annotate_transcription(tenant, document_version, input_file, output_file, model_variables): -# try: -# current_app.logger.debug(f'Annotating transcription for tenant {tenant.id}') -# -# char_splitter = CharacterTextSplitter(separator='.', -# chunk_size=model_variables['annotation_chunk_length'], -# chunk_overlap=0) -# -# headers_to_split_on = [ -# ("#", "Header 1"), -# ("##", "Header 2"), -# ] -# markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) -# -# llm = model_variables['llm'] -# template = model_variables['transcript_template'] -# language_template = create_language_template(template, document_version.language) -# transcript_prompt = ChatPromptTemplate.from_template(language_template) -# setup = RunnablePassthrough() -# output_parser = StrOutputParser() -# -# # Download the transcription file from MinIO -# transcript_data = minio_client.download_document_file(tenant.id, document_version.doc_id, -# document_version.language, document_version.id, -# input_file) -# transcript = transcript_data.decode('utf-8') -# -# chain = setup | transcript_prompt | llm | output_parser -# -# chunks = char_splitter.split_text(transcript) -# all_markdown_chunks = [] -# last_markdown_chunk = '' -# for chunk in chunks: -# current_app.logger.debug(f'Annotating next chunk of {len(chunks)} for tenant {tenant.id}') -# full_input = last_markdown_chunk + '\n' + chunk -# if tenant.embed_tuning: -# current_app.embed_tuning_logger.debug(f'Annotating chunk: \n ' -# f'------------------\n' -# f'{full_input}\n' -# f'------------------\n') -# input_transcript = {'transcript': full_input} -# markdown = chain.invoke(input_transcript) -# # GPT-4o returns some kind of content description: ```markdown ``` -# if markdown.startswith("```markdown"): -# markdown = "\n".join(markdown.strip().split("\n")[1:-1]) -# if tenant.embed_tuning: -# current_app.embed_tuning_logger.debug(f'Markdown Received: \n ' -# f'------------------\n' -# f'{markdown}\n' -# f'------------------\n') -# md_header_splits = markdown_splitter.split_text(markdown) -# markdown_chunks = [doc.page_content for doc in md_header_splits] -# # claude-3.5-sonnet returns introductory text -# if not markdown_chunks[0].startswith('#'): -# markdown_chunks.pop(0) -# last_markdown_chunk = markdown_chunks[-1] -# last_markdown_chunk = "\n".join(markdown.strip().split("\n")[1:]) -# markdown_chunks.pop() -# all_markdown_chunks += markdown_chunks -# -# all_markdown_chunks += [last_markdown_chunk] -# -# annotated_transcript = '\n'.join(all_markdown_chunks) -# -# # Upload the annotated transcript to MinIO -# minio_client.upload_document_file( -# tenant.id, -# document_version.doc_id, -# document_version.language, -# document_version.id, -# output_file, -# annotated_transcript.encode('utf-8') -# ) -# -# current_app.logger.info(f'Annotated transcription for tenant {tenant.id}') -# except Exception as e: -# current_app.logger.error(f'Error annotating transcription for tenant {tenant.id}, with error: {e}') -# raise - - def create_potential_chunks_for_markdown(tenant_id, document_version, input_file): try: current_app.logger.info(f'Creating potential chunks for tenant {tenant_id}') diff --git a/migrations/public/versions/2cbdb23ae02e_corrected_businesseventlog.py b/migrations/public/versions/2cbdb23ae02e_corrected_businesseventlog.py new file mode 100644 index 0000000..0ceb2cf --- /dev/null +++ b/migrations/public/versions/2cbdb23ae02e_corrected_businesseventlog.py @@ -0,0 +1,49 @@ +"""Corrected BusinessEventLog + +Revision ID: 2cbdb23ae02e +Revises: e3c6ff8c22df +Create Date: 2024-09-25 10:17:40.154566 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '2cbdb23ae02e' +down_revision = 'e3c6ff8c22df' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('business_event_log', schema=None) as batch_op: + batch_op.alter_column('span_id', + existing_type=sa.VARCHAR(length=50), + nullable=True) + batch_op.alter_column('span_name', + existing_type=sa.VARCHAR(length=50), + nullable=True) + batch_op.alter_column('parent_span_id', + existing_type=sa.VARCHAR(length=50), + nullable=True) + + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('business_event_log', schema=None) as batch_op: + batch_op.alter_column('parent_span_id', + existing_type=sa.VARCHAR(length=50), + nullable=False) + batch_op.alter_column('span_name', + existing_type=sa.VARCHAR(length=50), + nullable=False) + batch_op.alter_column('span_id', + existing_type=sa.VARCHAR(length=50), + nullable=False) + + # ### end Alembic commands ### diff --git a/migrations/public/versions/e3c6ff8c22df_updated_monitoring_setup.py b/migrations/public/versions/e3c6ff8c22df_updated_monitoring_setup.py new file mode 100644 index 0000000..1cabea6 --- /dev/null +++ b/migrations/public/versions/e3c6ff8c22df_updated_monitoring_setup.py @@ -0,0 +1,67 @@ +"""Updated Monitoring Setup + +Revision ID: e3c6ff8c22df +Revises: 25588210dab2 +Create Date: 2024-09-25 10:05:57.684506 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = 'e3c6ff8c22df' +down_revision = '25588210dab2' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('business_event_log', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('timestamp', sa.DateTime(), nullable=False), + sa.Column('event_type', sa.String(length=50), nullable=False), + sa.Column('tenant_id', sa.Integer(), nullable=False), + sa.Column('trace_id', sa.String(length=50), nullable=False), + sa.Column('span_id', sa.String(length=50), nullable=False), + sa.Column('span_name', sa.String(length=50), nullable=False), + sa.Column('parent_span_id', sa.String(length=50), nullable=False), + sa.Column('document_version_id', sa.Integer(), nullable=True), + sa.Column('chat_session_id', sa.Integer(), nullable=True), + sa.Column('interaction_id', sa.Integer(), nullable=True), + sa.Column('environment', sa.String(length=20), nullable=True), + sa.Column('message', sa.Text(), nullable=True), + sa.PrimaryKeyConstraint('id'), + schema='public' + ) + op.drop_table('llm_usage_metric') + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + + op.create_table('llm_usage_metric', + sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False), + sa.Column('tenant_id', sa.INTEGER(), autoincrement=False, nullable=False), + sa.Column('environment', sa.VARCHAR(length=20), autoincrement=False, nullable=False), + sa.Column('activity', sa.VARCHAR(length=20), autoincrement=False, nullable=False), + sa.Column('sub_activity', sa.VARCHAR(length=20), autoincrement=False, nullable=False), + sa.Column('activity_detail', sa.VARCHAR(length=50), autoincrement=False, nullable=True), + sa.Column('session_id', sa.VARCHAR(length=50), autoincrement=False, nullable=True), + sa.Column('interaction_id', sa.INTEGER(), autoincrement=False, nullable=True), + sa.Column('document_version_id', sa.INTEGER(), autoincrement=False, nullable=True), + sa.Column('prompt_tokens', sa.INTEGER(), autoincrement=False, nullable=True), + sa.Column('completion_tokens', sa.INTEGER(), autoincrement=False, nullable=True), + sa.Column('total_tokens', sa.INTEGER(), autoincrement=False, nullable=True), + sa.Column('cost', sa.DOUBLE_PRECISION(precision=53), autoincrement=False, nullable=True), + sa.Column('latency', sa.DOUBLE_PRECISION(precision=53), autoincrement=False, nullable=True), + sa.Column('model_name', sa.VARCHAR(length=50), autoincrement=False, nullable=False), + sa.Column('timestamp', postgresql.TIMESTAMP(), autoincrement=False, nullable=False), + sa.Column('additional_info', postgresql.JSONB(astext_type=sa.Text()), autoincrement=False, nullable=True), + sa.PrimaryKeyConstraint('id', name='llm_usage_metric_pkey') + ) + op.drop_table('business_event_log', schema='public') + # ### end Alembic commands ###