From 4bf12db14277f53edfbefa7a53b0766094a9a94c Mon Sep 17 00:00:00 2001 From: Josako Date: Wed, 16 Apr 2025 15:39:16 +0200 Subject: [PATCH] - Significantly changed the PDF Processor to use Mistral's OCR model - ensure very long chunks get split into smaller chunks - ensure TrackedMistralAIEmbedding is batched if needed to ensure correct execution - upgraded some of the packages to a higher version --- .../eveai_model/tracked_mistral_embeddings.py | 135 +++++++-- .../eveai_model/tracked_mistral_ocr_client.py | 53 ++++ common/models/entitlements.py | 1 + common/utils/business_event.py | 60 ++-- common/utils/model_utils.py | 5 + eveai_entitlements/tasks.py | 1 + eveai_workers/processors/pdf_processor.py | 10 +- eveai_workers/tasks.py | 275 +++++++++++++++++- ...c22f_add_nr_of_pages_to_llm_metrics_in_.py | 31 ++ requirements.txt | 38 +-- 10 files changed, 518 insertions(+), 91 deletions(-) create mode 100644 common/eveai_model/tracked_mistral_ocr_client.py create mode 100644 migrations/public/versions/605395afc22f_add_nr_of_pages_to_llm_metrics_in_.py diff --git a/common/eveai_model/tracked_mistral_embeddings.py b/common/eveai_model/tracked_mistral_embeddings.py index 73d018d..0a44fbf 100644 --- a/common/eveai_model/tracked_mistral_embeddings.py +++ b/common/eveai_model/tracked_mistral_embeddings.py @@ -9,32 +9,133 @@ from mistralai import Mistral class TrackedMistralAIEmbeddings(EveAIEmbeddings): - def __init__(self, model: str = "mistral_embed"): + def __init__(self, model: str = "mistral_embed", batch_size: int = 10): + """ + Initialize the TrackedMistralAIEmbeddings class. + + Args: + model: The embedding model to use + batch_size: Maximum number of texts to send in a single API call + """ api_key = current_app.config['MISTRAL_API_KEY'] self.client = Mistral( api_key=api_key ) self.model = model + self.batch_size = batch_size super().__init__() def embed_documents(self, texts: list[str]) -> list[list[float]]: - start_time = time.time() - result = self.client.embeddings.create( - model=self.model, - inputs=texts - ) - end_time = time.time() + """ + Embed a list of texts, processing in batches to avoid API limitations. - metrics = { - 'total_tokens': result.usage.total_tokens, - 'prompt_tokens': result.usage.prompt_tokens, # For embeddings, all tokens are prompt tokens - 'completion_tokens': result.usage.completion_tokens, - 'time_elapsed': end_time - start_time, - 'interaction_type': 'Embedding', - } - current_event.log_llm_metrics(metrics) + Args: + texts: A list of texts to embed - embeddings = [embedding.embedding for embedding in result.data] + Returns: + A list of embeddings, one for each input text + """ + if not texts: + return [] - return embeddings + all_embeddings = [] + + # Process texts in batches + for i in range(0, len(texts), self.batch_size): + batch = texts[i:i + self.batch_size] + batch_num = i // self.batch_size + 1 + current_app.logger.debug(f"Processing embedding batch {batch_num}, size: {len(batch)}") + + start_time = time.time() + try: + result = self.client.embeddings.create( + model=self.model, + inputs=batch + ) + end_time = time.time() + batch_time = end_time - start_time + + batch_embeddings = [embedding.embedding for embedding in result.data] + all_embeddings.extend(batch_embeddings) + + # Log metrics for this batch + metrics = { + 'total_tokens': result.usage.total_tokens, + 'prompt_tokens': result.usage.prompt_tokens, + 'completion_tokens': result.usage.completion_tokens, + 'time_elapsed': batch_time, + 'interaction_type': 'Embedding', + 'batch': batch_num, + 'batch_size': len(batch) + } + current_event.log_llm_metrics(metrics) + + current_app.logger.debug(f"Batch {batch_num} processed: {len(batch)} texts, " + f"{result.usage.total_tokens} tokens, {batch_time:.2f}s") + + # If processing multiple batches, add a small delay to avoid rate limits + if len(texts) > self.batch_size and i + self.batch_size < len(texts): + time.sleep(0.25) # 250ms pause between batches + + except Exception as e: + current_app.logger.error(f"Error in embedding batch {batch_num}: {str(e)}") + # If a batch fails, try to process each text individually + for j, text in enumerate(batch): + try: + current_app.logger.debug(f"Attempting individual embedding for item {i + j}") + single_start_time = time.time() + single_result = self.client.embeddings.create( + model=self.model, + inputs=[text] + ) + single_end_time = time.time() + + # Add the single embedding + single_embedding = single_result.data[0].embedding + all_embeddings.append(single_embedding) + + # Log metrics for this individual embedding + single_metrics = { + 'total_tokens': single_result.usage.total_tokens, + 'prompt_tokens': single_result.usage.prompt_tokens, + 'completion_tokens': single_result.usage.completion_tokens, + 'time_elapsed': single_end_time - single_start_time, + 'interaction_type': 'Embedding', + 'batch': f"{batch_num}-recovery-{j}", + 'batch_size': 1 + } + current_event.log_llm_metrics(single_metrics) + + except Exception as inner_e: + current_app.logger.error(f"Failed to embed individual text at index {i + j}: {str(inner_e)}") + # Add a zero vector as a placeholder for failed embeddings + # Use the correct dimensionality for the model (1024 for mistral_embed) + embedding_dim = 1024 + all_embeddings.append([0.0] * embedding_dim) + + total_batches = (len(texts) + self.batch_size - 1) // self.batch_size + current_app.logger.info(f"Embedded {len(texts)} texts in {total_batches} batches") + + return all_embeddings + + # def embed_documents(self, texts: list[str]) -> list[list[float]]: + # start_time = time.time() + # result = self.client.embeddings.create( + # model=self.model, + # inputs=texts + # ) + # end_time = time.time() + # + # metrics = { + # 'total_tokens': result.usage.total_tokens, + # 'prompt_tokens': result.usage.prompt_tokens, # For embeddings, all tokens are prompt tokens + # 'completion_tokens': result.usage.completion_tokens, + # 'time_elapsed': end_time - start_time, + # 'interaction_type': 'Embedding', + # } + # current_event.log_llm_metrics(metrics) + # + # embeddings = [embedding.embedding for embedding in result.data] + # + # return embeddings diff --git a/common/eveai_model/tracked_mistral_ocr_client.py b/common/eveai_model/tracked_mistral_ocr_client.py new file mode 100644 index 0000000..e1c721d --- /dev/null +++ b/common/eveai_model/tracked_mistral_ocr_client.py @@ -0,0 +1,53 @@ +import re +import time + +from flask import current_app +from mistralai import Mistral + +from common.utils.business_event_context import current_event + + +class TrackedMistralOcrClient: + def __init__(self): + api_key = current_app.config['MISTRAL_API_KEY'] + self.client = Mistral( + api_key=api_key, + ) + self.model = "mistral-ocr-latest" + + def _get_title(self, markdown): + # Look for the first level-1 heading + match = re.search(r'^# (.+)', markdown, re.MULTILINE) + return match.group(1).strip() if match else None + + def process_pdf(self, file_name, file_content): + start_time = time.time() + uploaded_pdf = self.client.files.upload( + file={ + "file_name": file_name, + "content": file_content + }, + purpose="ocr" + ) + signed_url = self.client.files.get_signed_url(file_id=uploaded_pdf.id) + ocr_response = self.client.ocr.process( + model=self.model, + document={ + "type": "document_url", + "document_url": signed_url.url + }, + include_image_base64=False + ) + nr_of_pages = len(ocr_response.pages) + all_markdown = " ".join(page.markdown for page in ocr_response.pages) + title = self._get_title(all_markdown) + end_time = time.time() + + metrics = { + 'nr_of_pages': nr_of_pages, + 'time_elapsed': end_time - start_time, + 'interaction_type': 'OCR', + } + current_event.log_llm_metrics(metrics) + + return all_markdown, title diff --git a/common/models/entitlements.py b/common/models/entitlements.py index 3690574..d33f98b 100644 --- a/common/models/entitlements.py +++ b/common/models/entitlements.py @@ -25,6 +25,7 @@ class BusinessEventLog(db.Model): llm_metrics_prompt_tokens = db.Column(db.Integer) llm_metrics_completion_tokens = db.Column(db.Integer) llm_metrics_total_time = db.Column(db.Float) + llm_metrics_nr_of_pages = db.Column(db.Integer) llm_metrics_call_count = db.Column(db.Integer) llm_interaction_type = db.Column(db.String(20)) message = db.Column(db.Text) diff --git a/common/utils/business_event.py b/common/utils/business_event.py index 2e5b150..a505a89 100644 --- a/common/utils/business_event.py +++ b/common/utils/business_event.py @@ -106,6 +106,7 @@ class BusinessEvent: 'total_tokens': 0, 'prompt_tokens': 0, 'completion_tokens': 0, + 'nr_of_pages': 0, 'total_time': 0, 'call_count': 0, 'interaction_type': None @@ -121,13 +122,6 @@ class BusinessEvent: if self.specialist_type_version else "" self.span_name_str = "" - current_app.logger.debug(f"Labels for metrics: " - f"tenant_id={self.tenant_id_str}, " - f"event_type={self.event_type_str}," - f"specialist_id={self.specialist_id_str}, " - f"specialist_type={self.specialist_type_str}, " + - f"specialist_type_version={self.specialist_type_version_str}") - # Increment concurrent events gauge when initialized CONCURRENT_TRACES.labels( tenant_id=self.tenant_id_str, @@ -168,24 +162,17 @@ class BusinessEvent: raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{attribute}'") def update_llm_metrics(self, metrics: dict): - self.llm_metrics['total_tokens'] += metrics['total_tokens'] - self.llm_metrics['prompt_tokens'] += metrics['prompt_tokens'] - self.llm_metrics['completion_tokens'] += metrics['completion_tokens'] - self.llm_metrics['total_time'] += metrics['time_elapsed'] + self.llm_metrics['total_tokens'] += metrics.get('total_tokens', 0) + self.llm_metrics['prompt_tokens'] += metrics.get('prompt_tokens', 0) + self.llm_metrics['completion_tokens'] += metrics.get('completion_tokens', 0) + self.llm_metrics['nr_of_pages'] += metrics.get('nr_of_pages', 0) + self.llm_metrics['total_time'] += metrics.get('time_elapsed', 0) self.llm_metrics['call_count'] += 1 self.llm_metrics['interaction_type'] = metrics['interaction_type'] # Track in Prometheus metrics interaction_type_str = sanitize_label(metrics['interaction_type']) if metrics['interaction_type'] else "" - current_app.logger.debug(f"Labels for metrics: " - f"tenant_id={self.tenant_id_str}, " - f"event_type={self.event_type_str}," - f"interaction_type={interaction_type_str}, " - f"specialist_id={self.specialist_id_str}, " - f"specialist_type={self.specialist_type_str}, " - f"specialist_type_version={self.specialist_type_version_str}") - # Track token usage LLM_TOKENS_COUNTER.labels( tenant_id=self.tenant_id_str, @@ -195,7 +182,7 @@ class BusinessEvent: specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str - ).inc(metrics['total_tokens']) + ).inc(metrics.get('total_tokens', 0)) LLM_TOKENS_COUNTER.labels( tenant_id=self.tenant_id_str, @@ -205,7 +192,7 @@ class BusinessEvent: specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str - ).inc(metrics['prompt_tokens']) + ).inc(metrics.get('prompt_tokens', 0)) LLM_TOKENS_COUNTER.labels( tenant_id=self.tenant_id_str, @@ -215,7 +202,7 @@ class BusinessEvent: specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str - ).inc(metrics['completion_tokens']) + ).inc(metrics.get('completion_tokens', 0)) # Track duration LLM_DURATION.labels( @@ -225,7 +212,7 @@ class BusinessEvent: specialist_id=self.specialist_id_str, specialist_type=self.specialist_type_str, specialist_type_version=self.specialist_type_version_str - ).observe(metrics['time_elapsed']) + ).observe(metrics.get('time_elapsed', 0)) # Track call count LLM_CALLS_COUNTER.labels( @@ -243,6 +230,7 @@ class BusinessEvent: self.llm_metrics['total_tokens'] = 0 self.llm_metrics['prompt_tokens'] = 0 self.llm_metrics['completion_tokens'] = 0 + self.llm_metrics['nr_of_pages'] = 0 self.llm_metrics['total_time'] = 0 self.llm_metrics['call_count'] = 0 self.llm_metrics['interaction_type'] = None @@ -270,14 +258,6 @@ class BusinessEvent: # Track start time for the span span_start_time = time.time() - current_app.logger.debug(f"Labels for metrics: " - f"tenant_id={self.tenant_id_str}, " - f"event_type={self.event_type_str}, " - f"activity_name={self.span_name_str}, " - f"specialist_id={self.specialist_id_str}, " - f"specialist_type={self.specialist_type_str}, " - f"specialist_type_version={self.specialist_type_version_str}") - # Increment span metrics - using span_name as activity_name for metrics SPAN_COUNTER.labels( tenant_id=self.tenant_id_str, @@ -363,14 +343,6 @@ class BusinessEvent: # Track start time for the span span_start_time = time.time() - current_app.logger.debug(f"Labels for metrics: " - f"tenant_id={self.tenant_id_str}, " - f"event_type={self.event_type_str}, " - f"activity_name={self.span_name_str}, " - f"specialist_id={self.specialist_id_str}, " - f"specialist_type={self.specialist_type_str}, " - f"specialist_type_version={self.specialist_type_version_str}") - # Increment span metrics - using span_name as activity_name for metrics SPAN_COUNTER.labels( tenant_id=self.tenant_id_str, @@ -487,10 +459,11 @@ class BusinessEvent: 'specialist_type': self.specialist_type, 'specialist_type_version': self.specialist_type_version, 'environment': self.environment, - 'llm_metrics_total_tokens': metrics['total_tokens'], - 'llm_metrics_prompt_tokens': metrics['prompt_tokens'], - 'llm_metrics_completion_tokens': metrics['completion_tokens'], - 'llm_metrics_total_time': metrics['time_elapsed'], + 'llm_metrics_total_tokens': metrics.get('total_tokens', 0), + 'llm_metrics_prompt_tokens': metrics.get('prompt_tokens', 0), + 'llm_metrics_completion_tokens': metrics.get('completion_tokens', 0), + 'llm_metrics_nr_of_pages': metrics.get('nr_of_pages', 0), + 'llm_metrics_total_time': metrics.get('time_elapsed', 0), 'llm_interaction_type': metrics['interaction_type'], 'message': message, } @@ -518,6 +491,7 @@ class BusinessEvent: 'llm_metrics_total_tokens': self.llm_metrics['total_tokens'], 'llm_metrics_prompt_tokens': self.llm_metrics['prompt_tokens'], 'llm_metrics_completion_tokens': self.llm_metrics['completion_tokens'], + 'llm_metrics_nr_of_pages': self.llm_metrics['nr_of_pages'], 'llm_metrics_total_time': self.llm_metrics['total_time'], 'llm_metrics_call_count': self.llm_metrics['call_count'], 'llm_interaction_type': self.llm_metrics['interaction_type'], diff --git a/common/utils/model_utils.py b/common/utils/model_utils.py index 2f11300..13abde7 100644 --- a/common/utils/model_utils.py +++ b/common/utils/model_utils.py @@ -135,6 +135,11 @@ def get_crewai_llm(full_model_name='mistral.mistral-large-latest', temperature=0 return llm +def process_pdf(): + full_model_name = 'mistral-ocr-latest' + + + class ModelVariables: """Manages model-related variables and configurations""" diff --git a/eveai_entitlements/tasks.py b/eveai_entitlements/tasks.py index 8950976..2530d25 100644 --- a/eveai_entitlements/tasks.py +++ b/eveai_entitlements/tasks.py @@ -97,6 +97,7 @@ def persist_business_events(log_entries): llm_metrics_total_tokens=entry.pop('llm_metrics_total_tokens', None), llm_metrics_prompt_tokens=entry.pop('llm_metrics_prompt_tokens', None), llm_metrics_completion_tokens=entry.pop('llm_metrics_completion_tokens', None), + llm_metrics_nr_of_pages=entry.pop('llm_metrics_nr_of_pages', None), llm_metrics_total_time=entry.pop('llm_metrics_total_time', None), llm_metrics_call_count=entry.pop('llm_metrics_call_count', None), llm_interaction_type=entry.pop('llm_interaction_type', None), diff --git a/eveai_workers/processors/pdf_processor.py b/eveai_workers/processors/pdf_processor.py index b5e1a39..00d22ed 100644 --- a/eveai_workers/processors/pdf_processor.py +++ b/eveai_workers/processors/pdf_processor.py @@ -7,6 +7,7 @@ from langchain_core.prompts import ChatPromptTemplate import re from langchain_core.runnables import RunnablePassthrough +from common.eveai_model.tracked_mistral_ocr_client import TrackedMistralOcrClient from common.extensions import minio_client from common.utils.model_utils import create_language_template, get_embedding_llm from .base_processor import BaseProcessor @@ -21,6 +22,7 @@ class PDFProcessor(BaseProcessor): self.chunk_size = catalog.max_chunk_size self.chunk_overlap = 0 self.tuning = self.processor.tuning + self.ocr_client = TrackedMistralOcrClient() def process(self): self._log("Starting PDF processing") @@ -30,14 +32,10 @@ class PDFProcessor(BaseProcessor): self.document_version.bucket_name, self.document_version.object_name, ) - - with current_event.create_span("PDF Extraction"): - extracted_content = self._extract_content(file_data) - structured_content, title = self._structure_content(extracted_content) + file_name = f"{self.document_version.bucket_name}_{self.document_version.object_name.replace("/", "_")}" with current_event.create_span("Markdown Generation"): - llm_chunks = self._split_content_for_llm(structured_content) - markdown = self._process_chunks_with_llm(llm_chunks) + markdown, title = self.ocr_client.process_pdf(file_name, file_data) self._save_markdown(markdown) self._log("Finished processing PDF") diff --git a/eveai_workers/tasks.py b/eveai_workers/tasks.py index cc3fe2a..d382b22 100644 --- a/eveai_workers/tasks.py +++ b/eveai_workers/tasks.py @@ -144,7 +144,8 @@ def delete_embeddings_for_document_version(document_version): def embed_markdown(tenant, model_variables, document_version, catalog, processor, markdown, title): # Create potential chunks - potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, processor, markdown) + potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, processor, markdown, + catalog.max_chunk_size) processor.log_tuning("Potential Chunks: ", {'potential chunks': potential_chunks}) # Combine chunks for embedding @@ -254,27 +255,286 @@ def embed_chunks(tenant, catalog, document_version, chunks): return new_embeddings -def create_potential_chunks_for_markdown(tenant_id, document_version, processor, markdown): +def create_potential_chunks_for_markdown(tenant_id, document_version, processor, markdown, max_chunk_size=2500): try: current_app.logger.info(f'Creating potential chunks for tenant {tenant_id}') - heading_level = processor.configuration.get('chunking_heading_level', 2) + configured_heading_level = processor.configuration.get('chunking_heading_level', 2) headers_to_split_on = [ - (f"{'#' * i}", f"Header {i}") for i in range(1, min(heading_level + 1, 7)) + (f"{'#' * i}", f"Header {i}") for i in range(1, min(configured_heading_level + 1, 7)) ] processor.log_tuning('Headers to split on', {'header list: ': headers_to_split_on}) markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False) md_header_splits = markdown_splitter.split_text(markdown) - potential_chunks = [doc.page_content for doc in md_header_splits] + initial_chunks = [doc.page_content for doc in md_header_splits] + final_chunks = [] + for chunk in initial_chunks: + if len(chunk) <= max_chunk_size: + final_chunks.append(chunk) + else: + # This chunk is too large, split it further + processor.log_tuning('Further splitting required', { + 'chunk_size': len(chunk), + 'max_chunk_size': max_chunk_size + }) - return potential_chunks + # Try splitting on deeper heading levels first + deeper_chunks = split_on_deeper_headings(chunk, configured_heading_level, max_chunk_size) + + # If deeper heading splits still exceed max size, split on paragraphs + chunks_to_process = [] + for deeper_chunk in deeper_chunks: + if len(deeper_chunk) <= max_chunk_size: + chunks_to_process.append(deeper_chunk) + else: + paragraph_chunks = split_on_paragraphs(deeper_chunk, max_chunk_size) + chunks_to_process.extend(paragraph_chunks) + + final_chunks.extend(chunks_to_process) + + processor.log_tuning('Final chunks', { + 'initial_chunk_count': len(initial_chunks), + 'final_chunk_count': len(final_chunks) + }) + + return final_chunks except Exception as e: current_app.logger.error(f'Error creating potential chunks for tenant {tenant_id}, with error: {e}') raise +def split_on_deeper_headings(chunk, already_split_level, max_chunk_size): + """ + Split a chunk on deeper heading levels than already used + + Args: + chunk: Markdown chunk to split + already_split_level: Heading level already used for splitting + max_chunk_size: Maximum allowed chunk size + + Returns: + List of chunks split on deeper headings + """ + # Define headers for deeper levels + deeper_headers = [ + (f"{'#' * i}", f"Header {i}") for i in range(already_split_level + 1, 7) + ] + + if not deeper_headers: + # No deeper headers possible, return original chunk + return [chunk] + + splitter = MarkdownHeaderTextSplitter(deeper_headers, strip_headers=False) + try: + splits = splitter.split_text(chunk) + return [doc.page_content for doc in splits] + except Exception: + # If splitting fails, return original chunk + return [chunk] + + +def split_on_paragraphs(chunk, max_chunk_size): + """ + Split a chunk on paragraph boundaries, preserving tables + + Args: + chunk: Markdown chunk to split + max_chunk_size: Maximum allowed chunk size + + Returns: + List of chunks split on paragraph boundaries + """ + # Split the chunk into parts: regular paragraphs and tables + parts = [] + current_part = "" + in_table = False + table_content = "" + + lines = chunk.split('\n') + + for i, line in enumerate(lines): + # Check if this line starts a table + if line.strip().startswith('|') and not in_table: + # Add current content as a part if not empty + if current_part.strip(): + parts.append(('text', current_part)) + current_part = "" + + in_table = True + table_content = line + '\n' + + # Check if we're in a table + elif in_table: + table_content += line + '\n' + + # Check if this line might end the table (empty line after a table line) + if not line.strip() and i > 0 and lines[i - 1].strip().startswith('|'): + parts.append(('table', table_content)) + table_content = "" + in_table = False + + # Regular content + else: + current_part += line + '\n' + + # If we have a blank line, it's a paragraph boundary + if not line.strip() and current_part.strip(): + parts.append(('text', current_part)) + current_part = "" + + # Handle any remaining content + if in_table and table_content.strip(): + parts.append(('table', table_content)) + elif current_part.strip(): + parts.append(('text', current_part)) + + # Now combine parts into chunks that respect max_chunk_size + result_chunks = [] + current_chunk = "" + + for part_type, content in parts: + # If it's a table, we don't want to split it + if part_type == 'table': + # If adding the table would exceed max size, start a new chunk + if len(current_chunk) + len(content) > max_chunk_size: + if current_chunk: + result_chunks.append(current_chunk) + + # If the table itself exceeds max size, we have to split it anyway + if len(content) > max_chunk_size: + # Split table into multiple chunks, trying to keep rows together + table_chunks = split_table(content, max_chunk_size) + result_chunks.extend(table_chunks) + else: + current_chunk = content + else: + current_chunk += content + + # For text parts, we can split more freely + else: + # If text is smaller than max size, try to add it + if len(content) <= max_chunk_size: + if len(current_chunk) + len(content) <= max_chunk_size: + current_chunk += content + else: + result_chunks.append(current_chunk) + current_chunk = content + else: + # Text part is too large, split it into paragraphs + if current_chunk: + result_chunks.append(current_chunk) + current_chunk = "" + + # Split by paragraphs (blank lines) + paragraphs = content.split('\n\n') + + for paragraph in paragraphs: + paragraph_with_newlines = paragraph + '\n\n' + + if len(paragraph_with_newlines) > max_chunk_size: + # This single paragraph is too large, split by sentences + sentences = re.split(r'(?<=[.!?])\s+', paragraph) + current_sentence_chunk = "" + + for sentence in sentences: + sentence_with_space = sentence + ' ' + if len(current_sentence_chunk) + len(sentence_with_space) <= max_chunk_size: + current_sentence_chunk += sentence_with_space + else: + if current_sentence_chunk: + result_chunks.append(current_sentence_chunk.strip()) + + # If single sentence exceeds max size, we have to split it + if len(sentence_with_space) > max_chunk_size: + # Split sentence into chunks of max_chunk_size + for i in range(0, len(sentence_with_space), max_chunk_size): + result_chunks.append(sentence_with_space[i:i + max_chunk_size].strip()) + else: + current_sentence_chunk = sentence_with_space + + if current_sentence_chunk: + result_chunks.append(current_sentence_chunk.strip()) + + elif len(current_chunk) + len(paragraph_with_newlines) <= max_chunk_size: + current_chunk += paragraph_with_newlines + else: + if current_chunk: + result_chunks.append(current_chunk.strip()) + current_chunk = paragraph_with_newlines + + # Add the last chunk if there's anything left + if current_chunk: + result_chunks.append(current_chunk.strip()) + + return result_chunks + + +def split_table(table_content, max_chunk_size): + """ + Split a table into multiple chunks, trying to keep rows together + + Args: + table_content: Markdown table content + max_chunk_size: Maximum allowed chunk size + + Returns: + List of table chunks + """ + lines = table_content.split('\n') + header_rows = [] + + # Find the header rows (usually first two rows: content and separator) + for i, line in enumerate(lines): + if i < 2 and line.strip().startswith('|'): + header_rows.append(line) + elif i == 2: + break + + header = '\n'.join(header_rows) + '\n' if header_rows else '' + + # If even the header is too big, we have a problem + if len(header) > max_chunk_size: + # Just split the table content regardless of rows + chunks = [] + current_chunk = "" + + for line in lines: + if len(current_chunk) + len(line) + 1 <= max_chunk_size: + current_chunk += line + '\n' + else: + chunks.append(current_chunk) + current_chunk = line + '\n' + + if current_chunk: + chunks.append(current_chunk) + + return chunks + + # Split the table with proper headers + chunks = [] + current_chunk = header + + for i, line in enumerate(lines): + # Skip header rows + if i < len(header_rows): + continue + + # If this row fits, add it + if len(current_chunk) + len(line) + 1 <= max_chunk_size: + current_chunk += line + '\n' + else: + # This row doesn't fit, start a new chunk + chunks.append(current_chunk) + current_chunk = header + line + '\n' + + if current_chunk != header: + chunks.append(current_chunk) + + return chunks + + def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars, processor): actual_chunks = [] current_chunk = "" @@ -325,6 +585,7 @@ def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars, processo # Force new chunk if pattern matches if chunking_patterns and matches_chunking_pattern(chunk, chunking_patterns): if current_chunk and current_length >= min_chars: + current_app.logger.debug(f"Chunk Length of chunk to embed: {len(current_chunk)} ") actual_chunks.append(current_chunk) current_chunk = chunk current_length = chunk_length @@ -332,6 +593,7 @@ def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars, processo if current_length + chunk_length > max_chars: if current_length >= min_chars: + current_app.logger.debug(f"Chunk Length of chunk to embed: {len(current_chunk)} ") actual_chunks.append(current_chunk) current_chunk = chunk current_length = chunk_length @@ -345,6 +607,7 @@ def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars, processo # Handle the last chunk if current_chunk and current_length >= 0: + current_app.logger.debug(f"Chunk Length of chunk to embed: {len(current_chunk)} ") actual_chunks.append(current_chunk) return actual_chunks diff --git a/migrations/public/versions/605395afc22f_add_nr_of_pages_to_llm_metrics_in_.py b/migrations/public/versions/605395afc22f_add_nr_of_pages_to_llm_metrics_in_.py new file mode 100644 index 0000000..5ab4e1d --- /dev/null +++ b/migrations/public/versions/605395afc22f_add_nr_of_pages_to_llm_metrics_in_.py @@ -0,0 +1,31 @@ +"""Add nr_of_pages to llm_metrics in BusinessEvent + +Revision ID: 605395afc22f +Revises: cfee2c5bcd7a +Create Date: 2025-04-16 07:25:43.959618 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '605395afc22f' +down_revision = 'cfee2c5bcd7a' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('business_event_log', schema=None) as batch_op: + batch_op.add_column(sa.Column('llm_metrics_nr_of_pages', sa.Integer(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('business_event_log', schema=None) as batch_op: + batch_op.drop_column('llm_metrics_nr_of_pages') + + # ### end Alembic commands ### diff --git a/requirements.txt b/requirements.txt index eb4d55f..e1b5524 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ -alembic~=1.14.1 +alembic~=1.15.2 annotated-types~=0.7.0 -bcrypt~=4.1.3 -beautifulsoup4~=4.12.3 +bcrypt~=4.3.0 +beautifulsoup4~=4.13.4 celery~=5.4.0 certifi~=2024.7.4 chardet~=5.2.0 @@ -9,29 +9,29 @@ cors~=1.0.1 Flask~=3.1.0 Flask-BabelEx~=0.9.4 Flask-Bootstrap~=3.3.7.1 -Flask-Cors~=5.0.0 +Flask-Cors~=5.0.1 Flask-JWT-Extended~=4.7.1 Flask-Login~=0.6.3 flask-mailman~=1.1.1 Flask-Migrate~=4.1.0 Flask-Principal~=0.4.0 -Flask-Security-Too~=5.6.0 +Flask-Security-Too~=5.6.1 Flask-Session~=0.8.0 Flask-SQLAlchemy~=3.1.1 Flask-WTF~=1.2.1 -gevent~=24.2.1 +gevent~=24.11.1 gevent-websocket~=0.10.1 greenlet~=3.0.3 gunicorn~=22.0.0 -Jinja2~=3.1.4 +Jinja2~=3.1.6 kombu~=5.3.7 -langchain~=0.3.0 -langchain-anthropic~=0.2.0 -langchain-community~=0.3.0 -langchain-core~=0.3.0 -langchain-mistralai~=0.2.0 -langchain-openai~=0.3.5 -langchain-postgres~=0.0.12 +langchain~=0.3.23 +langchain-anthropic~=0.3.11 +langchain-community~=0.3.21 +langchain-core~=0.3.52 +langchain-mistralai~=0.2.10 +langchain-openai~=0.3.13 +langchain-postgres~=0.0.14 langchain-text-splitters~=0.3.0 langcodes~=3.4.0 langdetect~=1.0.9 @@ -41,7 +41,7 @@ pg8000~=1.31.2 pgvector~=0.2.5 pycryptodome~=3.20.0 pydantic~=2.9.1 -PyJWT~=2.8.0 +PyJWT~=2.10.1 python-dateutil~=2.9.0.post0 python-engineio~=4.9.1 python-iso639~=2024.4.27 @@ -50,11 +50,11 @@ pytz~=2024.1 PyYAML~=6.0.2 redis~=5.0.4 requests~=2.32.3 -SQLAlchemy~=2.0.35 +SQLAlchemy~=2.0.40 tiktoken~=0.7.0 tzdata~=2024.1 urllib3~=2.2.2 -WTForms~=3.1.2 +WTForms~=3.2.1 wtforms-html5~=0.6.1 zxcvbn~=4.4.28 groq~=0.9.0 @@ -84,10 +84,10 @@ typing_extensions~=4.12.2 babel~=2.16.0 dogpile.cache~=1.3.3 python-docx~=1.1.2 -crewai~=0.108.0 +crewai~=0.114.0 sseclient~=0.0.27 termcolor~=2.5.0 mistral-common~=1.5.3 -mistralai~=1.5.0 +mistralai~=1.6.0 contextvars~=2.4 pandas~=2.2.3 \ No newline at end of file