Files
eveAI/eveai_workers/tasks.py

682 lines
27 KiB
Python

import re
from datetime import datetime as dt, timezone as tz
from celery import states
from flask import current_app
# OpenAI imports
from langchain.text_splitter import MarkdownHeaderTextSplitter
from langchain_core.exceptions import LangChainException
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from sqlalchemy import or_
from sqlalchemy.exc import SQLAlchemyError, InterfaceError, OperationalError
from redis.exceptions import ConnectionError as RedisConnectionError, TimeoutError as RedisTimeoutError
import traceback
from common.extensions import db, cache_manager
from common.models.document import DocumentVersion, Embedding, Document, Processor, Catalog
from common.models.user import Tenant
from common.utils.celery_utils import current_celery
from common.utils.database import Database
from common.utils.model_utils import create_language_template, get_embedding_model_and_class, get_template
from common.utils.business_event import BusinessEvent
from common.utils.business_event_context import current_event
from config.type_defs.processor_types import PROCESSOR_TYPES
from eveai_workers.processors.processor_registry import ProcessorRegistry
from common.utils.eveai_exceptions import EveAIInvalidEmbeddingModel, EveAINoContentFound, EveAIUnsupportedFileType, \
EveAINoProcessorFound
from common.utils.config_field_types import json_to_pattern_list
# Healthcheck task
@current_celery.task(bind=True, name='ping', queue='embeddings',
autoretry_for=(InterfaceError, OperationalError, RedisConnectionError, RedisTimeoutError, OSError),
retry_backoff=True, retry_jitter=True, max_retries=5)
def ping():
return 'pong'
@current_celery.task(bind=True, name='create_embeddings', queue='embeddings',
autoretry_for=(InterfaceError, OperationalError, RedisConnectionError, RedisTimeoutError, OSError),
retry_backoff=True, retry_jitter=True, max_retries=5)
def create_embeddings(self, tenant_id, document_version_id):
document_version = None
try:
# Retrieve Tenant for which we are processing
tenant = Tenant.query.get(tenant_id)
if tenant is None:
raise Exception(f'Tenant {tenant_id} not found')
# Ensure we are working in the correct database schema
Database(tenant_id).switch_schema()
# Retrieve document version to process
document_version = DocumentVersion.query.get(document_version_id)
if document_version is None:
raise Exception(f'Document version {document_version_id} not found')
# Retrieve the Catalog ID
doc = Document.query.get_or_404(document_version.doc_id)
catalog_id = doc.catalog_id
catalog = Catalog.query.get_or_404(catalog_id)
# Define processor related information
processor = get_processor_for_document(catalog_id, document_version.file_type, document_version.sub_file_type)
processor_class = ProcessorRegistry.get_processor_class(processor.type)
except Exception as e:
current_app.logger.error(f'Create Embeddings request received '
f'for badly configured document version {document_version_id} '
f'for tenant {tenant_id}, '
f'error: {e}')
if document_version:
document_version.processing_error = str(e)
raise
# BusinessEvent creates a context, which is why we need to use it with a with block
with BusinessEvent('Create Embeddings', tenant_id,
document_version_id=document_version_id,
document_version_file_size=document_version.file_size):
current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id}')
try:
db.session.add(document_version)
# start processing
document_version.processing = True
document_version.processing_started_at = dt.now(tz.utc)
document_version.processing_finished_at = None
document_version.processing_error = None
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Unable to save Embedding status information '
f'in document version {document_version_id} '
f'for tenant {tenant_id}')
raise
delete_embeddings_for_document_version(document_version)
try:
with current_event.create_span(f"{processor.type} Processing"):
document_processor = processor_class(
tenant=tenant,
document_version=document_version,
catalog=catalog,
processor=processor
)
markdown, title = document_processor.process()
document_processor.log_tuning("Processor returned: ", {
'markdown': markdown,
'title': title
})
if not markdown or markdown.strip() == '':
raise EveAINoContentFound(document_version.doc_id, document_version.id)
with current_event.create_span("Embedding"):
embed_markdown(tenant, document_version, catalog, document_processor, markdown, title)
current_event.log("Finished Embedding Creation Task")
except Exception as e:
stacktrace = traceback.format_exc()
current_app.logger.error(f'Error creating embeddings for tenant {tenant_id} '
f'on document version {document_version_id} '
f'error: {e}\n'
f'Stacktrace: {stacktrace}')
document_version.processing = False
document_version.processing_finished_at = dt.now(tz.utc)
document_version.processing_error = str(e)[:255]
db.session.commit()
self.update_state(state=states.FAILURE)
raise
def delete_embeddings_for_document_version(document_version):
embeddings_to_delete = db.session.query(Embedding).filter_by(doc_vers_id=document_version.id).all()
for embedding in embeddings_to_delete:
db.session.delete(embedding)
try:
db.session.commit()
current_app.logger.info(f'Deleted embeddings for document version {document_version.id}')
except SQLAlchemyError as e:
current_app.logger.error(f'Unable to delete embeddings for document version {document_version.id}')
raise
def embed_markdown(tenant, document_version, catalog, processor, markdown, title):
# Create potential chunks
potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, processor, markdown,
catalog.max_chunk_size)
processor.log_tuning("Potential Chunks: ", {'potential chunks': potential_chunks})
# Combine chunks for embedding
chunks = combine_chunks_for_markdown(potential_chunks, catalog.min_chunk_size, catalog.max_chunk_size, processor)
processor.log_tuning("Chunks: ", {'chunks': chunks})
# Enrich chunks
with current_event.create_span("Enrich Chunks"):
enriched_chunks = enrich_chunks(tenant, document_version, title, chunks)
processor.log_tuning("Enriched Chunks: ", {'enriched_chunks': enriched_chunks})
# Create embeddings
with current_event.create_span("Create Embeddings"):
embeddings = embed_chunks(tenant, catalog, document_version, enriched_chunks)
# Update document version and save embeddings
try:
db.session.add(document_version)
document_version.processing_finished_at = dt.now(tz.utc)
document_version.processing = False
db.session.add_all(embeddings)
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} '
f'on HTML, document version {document_version.id}'
f'error: {e}')
raise
current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} '
f'on document version {document_version.id} :-)')
def enrich_chunks(tenant, document_version, title, chunks):
summary = ''
if len(chunks) > 1:
summary = summarize_chunk(tenant, document_version, chunks[0])
chunk_total_context = (f'Filename: {document_version.object_name}\n'
f'User Context:\n{document_version.user_context}\n\n'
f'User Metadata:\n{document_version.user_metadata}\n\n'
f'Title: {title}\n'
f'Summary:\n{summary}\n'
f'System Context:\n{document_version.system_context}\n\n'
f'System Metadata:\n{document_version.system_metadata}\n\n'
)
enriched_chunks = []
initial_chunk = (f'Filename: {document_version.object_name}\n'
f'User Context:\n{document_version.user_context}\n\n'
f'User Metadata:\n{document_version.user_metadata}\n\n'
f'Title: {title}\n'
f'System Context:\n{document_version.system_context}\n\n'
f'System Metadata:\n{document_version.system_metadata}\n\n'
f'{chunks[0]}'
)
enriched_chunks.append(initial_chunk)
for chunk in chunks[1:]:
enriched_chunk = f'{chunk_total_context}\n{chunk}'
enriched_chunks.append(enriched_chunk)
return enriched_chunks
def summarize_chunk(tenant, document_version, chunk):
current_event.log("Starting Summarizing Chunk")
template, llm = get_template("summary")
language_template = create_language_template(template, document_version.language)
summary_prompt = ChatPromptTemplate.from_template(language_template)
setup = RunnablePassthrough()
output_parser = StrOutputParser()
chain = setup | summary_prompt | llm | output_parser
try:
summary = chain.invoke({"text": chunk})
current_event.log("Finished Summarizing Chunk")
return summary
except LangChainException as e:
current_app.logger.error(f'Error creating summary for chunk enrichment for tenant {tenant.id} '
f'on document version {document_version.id} '
f'error: {e}')
raise
def embed_chunks(tenant, catalog, document_version, chunks):
embedding_model, embedding_model_class = get_embedding_model_and_class(tenant.id, catalog.id)
# Actually embed
try:
embeddings = embedding_model.embed_documents(chunks)
except LangChainException as e:
current_app.logger.error(f'Error creating embeddings for tenant {tenant.id} '
f'on document version {document_version.id} while calling OpenAI API'
f'error: {e}')
raise
# Add embeddings to the database
new_embeddings = []
for chunk, embedding in zip(chunks, embeddings):
new_embedding = embedding_model_class()
new_embedding.document_version = document_version
new_embedding.active = True
new_embedding.chunk = chunk
new_embedding.embedding = embedding
new_embeddings.append(new_embedding)
return new_embeddings
def create_potential_chunks_for_markdown(tenant_id, document_version, processor, markdown, max_chunk_size=2500):
try:
current_app.logger.info(f'Creating potential chunks for tenant {tenant_id}')
configured_heading_level = processor.configuration.get('chunking_heading_level', 2)
headers_to_split_on = [
(f"{'#' * i}", f"Header {i}") for i in range(1, min(configured_heading_level + 1, 7))
]
processor.log_tuning('Headers to split on', {'header list: ': headers_to_split_on})
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False)
md_header_splits = markdown_splitter.split_text(markdown)
initial_chunks = [doc.page_content for doc in md_header_splits]
final_chunks = []
for chunk in initial_chunks:
if len(chunk) <= max_chunk_size:
final_chunks.append(chunk)
else:
# This chunk is too large, split it further
processor.log_tuning('Further splitting required', {
'chunk_size': len(chunk),
'max_chunk_size': max_chunk_size
})
# Try splitting on deeper heading levels first
deeper_chunks = split_on_deeper_headings(chunk, configured_heading_level, max_chunk_size)
# If deeper heading splits still exceed max size, split on paragraphs
chunks_to_process = []
for deeper_chunk in deeper_chunks:
if len(deeper_chunk) <= max_chunk_size:
chunks_to_process.append(deeper_chunk)
else:
paragraph_chunks = split_on_paragraphs(deeper_chunk, max_chunk_size)
chunks_to_process.extend(paragraph_chunks)
final_chunks.extend(chunks_to_process)
processor.log_tuning('Final chunks', {
'initial_chunk_count': len(initial_chunks),
'final_chunk_count': len(final_chunks)
})
return final_chunks
except Exception as e:
current_app.logger.error(f'Error creating potential chunks for tenant {tenant_id}, with error: {e}')
raise
def split_on_deeper_headings(chunk, already_split_level, max_chunk_size):
"""
Split a chunk on deeper heading levels than already used
Args:
chunk: Markdown chunk to split
already_split_level: Heading level already used for splitting
max_chunk_size: Maximum allowed chunk size
Returns:
List of chunks split on deeper headings
"""
# Define headers for deeper levels
deeper_headers = [
(f"{'#' * i}", f"Header {i}") for i in range(already_split_level + 1, 7)
]
if not deeper_headers:
# No deeper headers possible, return original chunk
return [chunk]
splitter = MarkdownHeaderTextSplitter(deeper_headers, strip_headers=False)
try:
splits = splitter.split_text(chunk)
return [doc.page_content for doc in splits]
except Exception:
# If splitting fails, return original chunk
return [chunk]
def split_on_paragraphs(chunk, max_chunk_size):
"""
Split a chunk on paragraph boundaries, preserving tables
Args:
chunk: Markdown chunk to split
max_chunk_size: Maximum allowed chunk size
Returns:
List of chunks split on paragraph boundaries
"""
# Split the chunk into parts: regular paragraphs and tables
parts = []
current_part = ""
in_table = False
table_content = ""
lines = chunk.split('\n')
for i, line in enumerate(lines):
# Check if this line starts a table
if line.strip().startswith('|') and not in_table:
# Add current content as a part if not empty
if current_part.strip():
parts.append(('text', current_part))
current_part = ""
in_table = True
table_content = line + '\n'
# Check if we're in a table
elif in_table:
table_content += line + '\n'
# Check if this line might end the table (empty line after a table line)
if not line.strip() and i > 0 and lines[i - 1].strip().startswith('|'):
parts.append(('table', table_content))
table_content = ""
in_table = False
# Regular content
else:
current_part += line + '\n'
# If we have a blank line, it's a paragraph boundary
if not line.strip() and current_part.strip():
parts.append(('text', current_part))
current_part = ""
# Handle any remaining content
if in_table and table_content.strip():
parts.append(('table', table_content))
elif current_part.strip():
parts.append(('text', current_part))
# Now combine parts into chunks that respect max_chunk_size
result_chunks = []
current_chunk = ""
for part_type, content in parts:
# If it's a table, we don't want to split it
if part_type == 'table':
# If adding the table would exceed max size, start a new chunk
if len(current_chunk) + len(content) > max_chunk_size:
if current_chunk:
result_chunks.append(current_chunk)
# If the table itself exceeds max size, we have to split it anyway
if len(content) > max_chunk_size:
# Split table into multiple chunks, trying to keep rows together
table_chunks = split_table(content, max_chunk_size)
result_chunks.extend(table_chunks)
else:
current_chunk = content
else:
current_chunk += content
# For text parts, we can split more freely
else:
# If text is smaller than max size, try to add it
if len(content) <= max_chunk_size:
if len(current_chunk) + len(content) <= max_chunk_size:
current_chunk += content
else:
result_chunks.append(current_chunk)
current_chunk = content
else:
# Text part is too large, split it into paragraphs
if current_chunk:
result_chunks.append(current_chunk)
current_chunk = ""
# Split by paragraphs (blank lines)
paragraphs = content.split('\n\n')
for paragraph in paragraphs:
paragraph_with_newlines = paragraph + '\n\n'
if len(paragraph_with_newlines) > max_chunk_size:
# This single paragraph is too large, split by sentences
sentences = re.split(r'(?<=[.!?])\s+', paragraph)
current_sentence_chunk = ""
for sentence in sentences:
sentence_with_space = sentence + ' '
if len(current_sentence_chunk) + len(sentence_with_space) <= max_chunk_size:
current_sentence_chunk += sentence_with_space
else:
if current_sentence_chunk:
result_chunks.append(current_sentence_chunk.strip())
# If single sentence exceeds max size, we have to split it
if len(sentence_with_space) > max_chunk_size:
# Split sentence into chunks of max_chunk_size
for i in range(0, len(sentence_with_space), max_chunk_size):
result_chunks.append(sentence_with_space[i:i + max_chunk_size].strip())
else:
current_sentence_chunk = sentence_with_space
if current_sentence_chunk:
result_chunks.append(current_sentence_chunk.strip())
elif len(current_chunk) + len(paragraph_with_newlines) <= max_chunk_size:
current_chunk += paragraph_with_newlines
else:
if current_chunk:
result_chunks.append(current_chunk.strip())
current_chunk = paragraph_with_newlines
# Add the last chunk if there's anything left
if current_chunk:
result_chunks.append(current_chunk.strip())
return result_chunks
def split_table(table_content, max_chunk_size):
"""
Split a table into multiple chunks, trying to keep rows together
Args:
table_content: Markdown table content
max_chunk_size: Maximum allowed chunk size
Returns:
List of table chunks
"""
lines = table_content.split('\n')
header_rows = []
# Find the header rows (usually first two rows: content and separator)
for i, line in enumerate(lines):
if i < 2 and line.strip().startswith('|'):
header_rows.append(line)
elif i == 2:
break
header = '\n'.join(header_rows) + '\n' if header_rows else ''
# If even the header is too big, we have a problem
if len(header) > max_chunk_size:
# Just split the table content regardless of rows
chunks = []
current_chunk = ""
for line in lines:
if len(current_chunk) + len(line) + 1 <= max_chunk_size:
current_chunk += line + '\n'
else:
chunks.append(current_chunk)
current_chunk = line + '\n'
if current_chunk:
chunks.append(current_chunk)
return chunks
# Split the table with proper headers
chunks = []
current_chunk = header
for i, line in enumerate(lines):
# Skip header rows
if i < len(header_rows):
continue
# If this row fits, add it
if len(current_chunk) + len(line) + 1 <= max_chunk_size:
current_chunk += line + '\n'
else:
# This row doesn't fit, start a new chunk
chunks.append(current_chunk)
current_chunk = header + line + '\n'
if current_chunk != header:
chunks.append(current_chunk)
return chunks
def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars, processor):
actual_chunks = []
current_chunk = ""
current_length = 0
def matches_chunking_pattern(text, patterns):
if not patterns:
return False
# Get the first line of the text
first_line = text.split('\n', 1)[0].strip()
# Check if it's a header at appropriate level
header_match = re.match(r'^(#{1,6})\s+(.+)$', first_line)
if not header_match:
return False
# Get the heading level (number of #s)
header_level = len(header_match.group(1))
# Get the header text
header_text = header_match.group(2)
# Check if header matches any pattern
for pattern in patterns:
try:
processor.log_tuning('Pattern check: ', {
'pattern: ': pattern,
'text': header_text
})
if re.search(pattern, header_text, re.IGNORECASE):
return True
except Exception as e:
current_app.logger.warning(f"Invalid regex pattern '{pattern}': {str(e)}")
continue
return False
chunking_patterns = json_to_pattern_list(processor.configuration.get('chunking_patterns', ""))
processor.log_tuning(f'Chunking Patterns Extraction: ', {
'Full Configuration': processor.configuration,
'Chunking Patterns': chunking_patterns,
})
for chunk in potential_chunks:
chunk_length = len(chunk)
# Force new chunk if pattern matches
if chunking_patterns and matches_chunking_pattern(chunk, chunking_patterns):
if current_chunk and current_length >= min_chars:
actual_chunks.append(current_chunk)
current_chunk = chunk
current_length = chunk_length
continue
if current_length + chunk_length > max_chars:
if current_length >= min_chars:
actual_chunks.append(current_chunk)
current_chunk = chunk
current_length = chunk_length
else:
# If the combined chunk is still less than max_chars, keep adding
current_chunk += f'\n{chunk}'
current_length += chunk_length
else:
current_chunk += f'\n{chunk}'
current_length += chunk_length
# Handle the last chunk
if current_chunk and current_length >= 0:
actual_chunks.append(current_chunk)
return actual_chunks
def get_processor_for_document(catalog_id: int, file_type: str, sub_file_type: str = None) -> Processor:
"""
Get the appropriate processor for a document based on catalog_id, file_type and optional sub_file_type.
Args:
catalog_id: ID of the catalog
file_type: Type of file (e.g., 'pdf', 'html')
sub_file_type: Optional sub-type for specialized processing
Returns:
Processor instance
Raises:
ValueError: If no matching processor is found
"""
try:
# Start with base query for catalog
query = Processor.query.filter_by(catalog_id=catalog_id).filter_by(active=True)
# If sub_file_type is provided, add that condition
if sub_file_type:
query = query.filter_by(sub_file_type=sub_file_type)
else:
# If no sub_file_type, prefer processors without sub_file_type specification
query = query.filter(or_(Processor.sub_file_type.is_(None),
Processor.sub_file_type == ''))
available_processors = query.all()
if not available_processors:
raise EveAINoProcessorFound(catalog_id, file_type, sub_file_type)
available_processor_types = [processor.type for processor in available_processors]
# Find processor type that handles this file type
matching_processor_type = None
for proc_type, config in PROCESSOR_TYPES.items():
# Alleen verwerken als dit type processor beschikbaar is in de database
if proc_type in available_processor_types:
supported_types = config['file_types']
if isinstance(supported_types, str):
supported_types = [t.strip() for t in supported_types.split(',')]
if file_type in supported_types:
matching_processor_type = proc_type
break
if not matching_processor_type:
raise EveAINoProcessorFound(catalog_id, file_type, sub_file_type)
processor = None
for proc in available_processors:
if proc.type == matching_processor_type:
processor = proc
break
if not processor:
raise EveAINoProcessorFound(catalog_id, file_type, sub_file_type)
return processor
except Exception as e:
current_app.logger.error(f"Error finding processor: {str(e)}")
raise