- Addition of general chunking parameters chunking_heading_level and chunking patterns
- Addition of Processor types docx and markdown
This commit is contained in:
@@ -46,7 +46,7 @@ class AudioProcessor(TranscriptionBaseProcessor):
|
||||
try:
|
||||
audio_info = AudioSegment.from_file(temp_file_path, format=self.document_version.file_type)
|
||||
total_duration = len(audio_info)
|
||||
self._log_tuning("_compress_audio", {
|
||||
self.log_tuning("_compress_audio", {
|
||||
"Audio Duration (ms)": total_duration,
|
||||
})
|
||||
segment_length = self.max_compression_duration * 1000 # Convert to milliseconds
|
||||
@@ -55,7 +55,7 @@ class AudioProcessor(TranscriptionBaseProcessor):
|
||||
compressed_segments = AudioSegment.empty()
|
||||
|
||||
for i in range(total_chunks):
|
||||
self._log_tuning("_compress_audio", {
|
||||
self.log_tuning("_compress_audio", {
|
||||
"Segment Nr": f"{i + 1} of {total_chunks}"
|
||||
})
|
||||
|
||||
@@ -87,7 +87,7 @@ class AudioProcessor(TranscriptionBaseProcessor):
|
||||
compressed_filename,
|
||||
compressed_buffer.read()
|
||||
)
|
||||
self._log_tuning("_compress_audio", {
|
||||
self.log_tuning("_compress_audio", {
|
||||
"Compressed audio to MinIO": compressed_filename
|
||||
})
|
||||
|
||||
@@ -172,14 +172,14 @@ class AudioProcessor(TranscriptionBaseProcessor):
|
||||
|
||||
transcriptions.append(trans)
|
||||
|
||||
self._log_tuning("_transcribe_audio", {
|
||||
self.log_tuning("_transcribe_audio", {
|
||||
"Chunk Nr": f"{i + 1} of {total_chunks}",
|
||||
"Segment Duration": segment_duration,
|
||||
"Transcription": trans,
|
||||
})
|
||||
else:
|
||||
self._log("Warning: Received empty transcription", level='warning')
|
||||
self._log_tuning("_transcribe_audio", {"ERROR": "No transcription"})
|
||||
self.log_tuning("_transcribe_audio", {"ERROR": "No transcription"})
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"Error during transcription: {str(e)}", level='error')
|
||||
@@ -202,7 +202,7 @@ class AudioProcessor(TranscriptionBaseProcessor):
|
||||
transcription_filename,
|
||||
full_transcription.encode('utf-8')
|
||||
)
|
||||
self._log_tuning(f"Saved transcription to MinIO: {transcription_filename}")
|
||||
self.log_tuning(f"Saved transcription to MinIO: {transcription_filename}")
|
||||
|
||||
return full_transcription
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ class BaseProcessor(ABC):
|
||||
self.tuning_logger = None
|
||||
self._setup_tuning_logger()
|
||||
|
||||
self._log_tuning("Processor initialized", {
|
||||
self.log_tuning("Processor initialized", {
|
||||
"processor_type": processor.type if processor else None,
|
||||
"document_version": document_version.id if document_version else None,
|
||||
"catalog": catalog.id if catalog else None
|
||||
@@ -42,6 +42,10 @@ class BaseProcessor(ABC):
|
||||
def process(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
def configuration(self):
|
||||
return self.processor.configuration
|
||||
|
||||
def _save_markdown(self, markdown):
|
||||
markdown_filename = f"{self.document_version.id}.md"
|
||||
minio_client.upload_document_file(
|
||||
@@ -78,7 +82,7 @@ class BaseProcessor(ABC):
|
||||
|
||||
return markdown
|
||||
|
||||
def _log_tuning(self, message: str, data: Dict[str, Any] = None) -> None:
|
||||
def log_tuning(self, message: str, data: Dict[str, Any] = None) -> None:
|
||||
if self.tuning and self.tuning_logger:
|
||||
try:
|
||||
self.tuning_logger.log_tuning('processor', message, data)
|
||||
|
||||
129
eveai_workers/processors/docx_processor.py
Normal file
129
eveai_workers/processors/docx_processor.py
Normal file
@@ -0,0 +1,129 @@
|
||||
import docx
|
||||
import io
|
||||
from .base_processor import BaseProcessor
|
||||
from .processor_registry import ProcessorRegistry
|
||||
from common.extensions import minio_client
|
||||
import re
|
||||
|
||||
|
||||
class DocxProcessor(BaseProcessor):
|
||||
def __init__(self, tenant, model_variables, document_version, catalog, processor):
|
||||
super().__init__(tenant, model_variables, document_version, catalog, processor)
|
||||
self.config = processor.configuration
|
||||
self.extract_comments = self.config.get('extract_comments', False)
|
||||
self.extract_headers_footers = self.config.get('extract_headers_footers', False)
|
||||
self.preserve_formatting = self.config.get('preserve_formatting', True)
|
||||
self.list_style = self.config.get('list_style', 'dash')
|
||||
self.image_handling = self.config.get('image_handling', 'skip')
|
||||
self.table_alignment = self.config.get('table_alignment', 'left')
|
||||
|
||||
def process(self):
|
||||
try:
|
||||
file_data = minio_client.download_document_file(
|
||||
self.tenant.id,
|
||||
self.document_version.bucket_name,
|
||||
self.document_version.object_name,
|
||||
)
|
||||
|
||||
doc = docx.Document(io.BytesIO(file_data))
|
||||
markdown = self._convert_to_markdown(doc)
|
||||
title = self._extract_title(doc)
|
||||
|
||||
self._save_markdown(markdown)
|
||||
return markdown, title
|
||||
|
||||
except Exception as e:
|
||||
self._log(f"Error processing DOCX: {str(e)}", level='error')
|
||||
raise
|
||||
|
||||
def _convert_to_markdown(self, doc):
|
||||
markdown_parts = []
|
||||
|
||||
if self.extract_headers_footers:
|
||||
for section in doc.sections:
|
||||
if section.header.paragraphs:
|
||||
markdown_parts.extend(self._process_paragraphs(section.header.paragraphs))
|
||||
|
||||
markdown_parts.extend(self._process_paragraphs(doc.paragraphs))
|
||||
|
||||
if self.extract_comments and doc.comments:
|
||||
markdown_parts.append("\n## Comments\n")
|
||||
for comment in doc.comments:
|
||||
markdown_parts.append(f"> {comment.text}\n")
|
||||
|
||||
return "\n".join(markdown_parts)
|
||||
|
||||
def _process_paragraphs(self, paragraphs):
|
||||
markdown_parts = []
|
||||
in_list = False
|
||||
|
||||
for para in paragraphs:
|
||||
if not para.text.strip():
|
||||
continue
|
||||
|
||||
style = para.style.name.lower()
|
||||
|
||||
if 'heading' in style:
|
||||
level = int(style[-1]) if style[-1].isdigit() else 1
|
||||
markdown_parts.append(f"{'#' * level} {para.text}\n")
|
||||
|
||||
elif para._p.pPr and para._p.pPr.numPr: # List item
|
||||
marker = self._get_list_marker()
|
||||
markdown_parts.append(f"{marker} {para.text}\n")
|
||||
in_list = True
|
||||
|
||||
else:
|
||||
if in_list:
|
||||
markdown_parts.append("\n")
|
||||
in_list = False
|
||||
|
||||
text = para.text
|
||||
if self.preserve_formatting:
|
||||
text = self._apply_formatting(para)
|
||||
|
||||
markdown_parts.append(f"{text}\n")
|
||||
|
||||
return markdown_parts
|
||||
|
||||
def _get_list_marker(self):
|
||||
return {
|
||||
'dash': '-',
|
||||
'asterisk': '*',
|
||||
'plus': '+'
|
||||
}.get(self.list_style, '-')
|
||||
|
||||
def _apply_formatting(self, paragraph):
|
||||
text = paragraph.text
|
||||
if not text:
|
||||
return ""
|
||||
|
||||
runs = paragraph.runs
|
||||
formatted_parts = []
|
||||
|
||||
for run in runs:
|
||||
part = run.text
|
||||
if run.bold:
|
||||
part = f"**{part}**"
|
||||
if run.italic:
|
||||
part = f"*{part}*"
|
||||
if run.underline:
|
||||
part = f"__{part}__"
|
||||
formatted_parts.append(part)
|
||||
|
||||
return "".join(formatted_parts)
|
||||
|
||||
def _extract_title(self, doc):
|
||||
if doc.paragraphs:
|
||||
first_para = doc.paragraphs[0]
|
||||
if 'heading' in first_para.style.name.lower():
|
||||
return first_para.text.strip()
|
||||
|
||||
# Look for first Heading 1 in document
|
||||
for para in doc.paragraphs:
|
||||
if para.style.name.lower() == 'heading 1':
|
||||
return para.text.strip()
|
||||
|
||||
return "Untitled Document"
|
||||
|
||||
|
||||
ProcessorRegistry.register("DOCX_PROCESSOR", DocxProcessor)
|
||||
@@ -24,7 +24,7 @@ class HTMLProcessor(BaseProcessor):
|
||||
# Add verification logging
|
||||
self._log(f"HTML Processor initialized with tuning={self.tuning}")
|
||||
if self.tuning:
|
||||
self._log_tuning("HTML Processor initialized", {
|
||||
self.log_tuning("HTML Processor initialized", {
|
||||
"html_tags": self.html_tags,
|
||||
"html_end_tags": self.html_end_tags,
|
||||
"included_elements": self.html_included_elements,
|
||||
@@ -75,7 +75,7 @@ class HTMLProcessor(BaseProcessor):
|
||||
title = soup.find('title').get_text(strip=True) if soup.find('title') else ''
|
||||
|
||||
self._log(f'Finished parsing HTML for tenant {self.tenant.id}')
|
||||
self._log_tuning("_parse_html", {"extracted_html": extracted_html, "title": title})
|
||||
self.log_tuning("_parse_html", {"extracted_html": extracted_html, "title": title})
|
||||
return extracted_html, title
|
||||
|
||||
def _generate_markdown_from_html(self, html_content):
|
||||
@@ -96,7 +96,7 @@ class HTMLProcessor(BaseProcessor):
|
||||
input_html = {"html": chunk}
|
||||
markdown_chunk = chain.invoke(input_html)
|
||||
markdown_chunks.append(markdown_chunk)
|
||||
self._log_tuning("_generate_markdown_from_html", {"chunk": chunk, "markdown_chunk": markdown_chunk})
|
||||
self.log_tuning("_generate_markdown_from_html", {"chunk": chunk, "markdown_chunk": markdown_chunk})
|
||||
|
||||
markdown = "\n\n".join(markdown_chunks)
|
||||
self._log(f'Finished generating markdown from HTML for tenant {self.tenant.id}')
|
||||
|
||||
48
eveai_workers/processors/markdown_processor.py
Normal file
48
eveai_workers/processors/markdown_processor.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
||||
from langchain_core.output_parsers import StrOutputParser
|
||||
from langchain_core.prompts import ChatPromptTemplate
|
||||
import re
|
||||
from langchain_core.runnables import RunnablePassthrough
|
||||
|
||||
from common.extensions import minio_client
|
||||
from common.utils.model_utils import create_language_template
|
||||
from .base_processor import BaseProcessor
|
||||
from common.utils.business_event_context import current_event
|
||||
from .processor_registry import ProcessorRegistry
|
||||
|
||||
|
||||
def _find_first_h1(markdown: str) -> str:
|
||||
# Look for # Header (allowing spaces after #)
|
||||
match = re.search(r'^#\s+(.+)$', markdown, re.MULTILINE)
|
||||
return match.group(1).strip() if match else ""
|
||||
|
||||
|
||||
class MarkdownProcessor(BaseProcessor):
|
||||
def __init__(self, tenant, model_variables, document_version, catalog, processor):
|
||||
super().__init__(tenant, model_variables, document_version, catalog, processor)
|
||||
|
||||
self.chunk_size = catalog.max_chunk_size
|
||||
self.chunk_overlap = 0
|
||||
self.tuning = self.processor.tuning
|
||||
|
||||
def process(self):
|
||||
self._log("Starting Markdown processing")
|
||||
try:
|
||||
file_data = minio_client.download_document_file(
|
||||
self.tenant.id,
|
||||
self.document_version.bucket_name,
|
||||
self.document_version.object_name,
|
||||
)
|
||||
|
||||
markdown = file_data.decode('utf-8')
|
||||
title = _find_first_h1(markdown)
|
||||
|
||||
self._save_markdown(markdown)
|
||||
self._log("Finished processing Markdown")
|
||||
return markdown, title
|
||||
except Exception as e:
|
||||
self._log(f"Error processing Markdown: {str(e)}", level='error')
|
||||
raise
|
||||
|
||||
|
||||
ProcessorRegistry.register("MARKDOWN_PROCESSOR", MarkdownProcessor)
|
||||
@@ -57,7 +57,7 @@ class PDFProcessor(BaseProcessor):
|
||||
'figures': self._extract_figures(page, page_num, figure_counter),
|
||||
'tables': self._extract_tables(page)
|
||||
}
|
||||
self._log_tuning("_extract_content", {"page_num": page_num, "page_content": page_content})
|
||||
self.log_tuning("_extract_content", {"page_num": page_num, "page_content": page_content})
|
||||
figure_counter += len(page_content['figures'])
|
||||
extracted_content.append(page_content)
|
||||
|
||||
@@ -119,7 +119,7 @@ class PDFProcessor(BaseProcessor):
|
||||
markdown_table = self._table_to_markdown(table)
|
||||
if markdown_table: # Only add non-empty tables
|
||||
tables.append(markdown_table)
|
||||
self._log_tuning("_extract_tables", {"markdown_table": markdown_table})
|
||||
self.log_tuning("_extract_tables", {"markdown_table": markdown_table})
|
||||
except Exception as e:
|
||||
self._log(f"Error extracting tables from page: {str(e)}", level='error')
|
||||
return tables
|
||||
|
||||
@@ -45,7 +45,7 @@ class TranscriptionBaseProcessor(BaseProcessor):
|
||||
return text_splitter.split_text(transcription)
|
||||
|
||||
def _process_chunks(self, chunks):
|
||||
self._log_tuning("_process_chunks", {"Nr of Chunks": len(chunks)})
|
||||
self.log_tuning("_process_chunks", {"Nr of Chunks": len(chunks)})
|
||||
llm = self.model_variables.get_llm()
|
||||
template = self.model_variables.get_template('transcript')
|
||||
language_template = create_language_template(template, self.document_version.language)
|
||||
@@ -64,7 +64,7 @@ class TranscriptionBaseProcessor(BaseProcessor):
|
||||
}
|
||||
markdown = chain.invoke(input_transcript)
|
||||
markdown = self._clean_markdown(markdown)
|
||||
self._log_tuning("_process_chunks", {
|
||||
self.log_tuning("_process_chunks", {
|
||||
"Chunk Number": f"{i + 1} of {len(chunks)}",
|
||||
"Chunk": chunk,
|
||||
"Previous Chunk": previous_part,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import re
|
||||
from datetime import datetime as dt, timezone as tz
|
||||
|
||||
from celery import states
|
||||
@@ -23,6 +24,8 @@ from common.utils.business_event_context import current_event
|
||||
from config.type_defs.processor_types import PROCESSOR_TYPES
|
||||
from eveai_workers.processors.processor_registry import ProcessorRegistry
|
||||
|
||||
from common.utils.config_field_types import json_to_pattern_list
|
||||
|
||||
|
||||
# Healthcheck task
|
||||
@current_celery.task(name='ping', queue='embeddings')
|
||||
@@ -99,9 +102,13 @@ def create_embeddings(tenant_id, document_version_id):
|
||||
processor=processor
|
||||
)
|
||||
markdown, title = document_processor.process()
|
||||
document_processor.log_tuning("Processor returned: ", {
|
||||
'markdown': markdown,
|
||||
'title': title
|
||||
})
|
||||
|
||||
with current_event.create_span("Embedding"):
|
||||
embed_markdown(tenant, model_variables, document_version, catalog, markdown, title)
|
||||
embed_markdown(tenant, model_variables, document_version, catalog, document_processor, markdown, title)
|
||||
|
||||
current_event.log("Finished Embedding Creation Task")
|
||||
|
||||
@@ -129,16 +136,19 @@ def delete_embeddings_for_document_version(document_version):
|
||||
raise
|
||||
|
||||
|
||||
def embed_markdown(tenant, model_variables, document_version, catalog, markdown, title):
|
||||
def embed_markdown(tenant, model_variables, document_version, catalog, processor, markdown, title):
|
||||
# Create potential chunks
|
||||
potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md")
|
||||
potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, processor, markdown)
|
||||
processor.log_tuning("Potential Chunks: ", {'potential chunks': potential_chunks})
|
||||
|
||||
# Combine chunks for embedding
|
||||
chunks = combine_chunks_for_markdown(potential_chunks, catalog.min_chunk_size, catalog.max_chunk_size)
|
||||
chunks = combine_chunks_for_markdown(potential_chunks, catalog.min_chunk_size, catalog.max_chunk_size, processor)
|
||||
processor.log_tuning("Chunks: ", {'chunks': chunks})
|
||||
|
||||
# Enrich chunks
|
||||
with current_event.create_span("Enrich Chunks"):
|
||||
enriched_chunks = enrich_chunks(tenant, model_variables, document_version, title, chunks)
|
||||
processor.log_tuning("Enriched Chunks: ", {'enriched_chunks': enriched_chunks})
|
||||
|
||||
# Create embeddings
|
||||
with current_event.create_span("Create Embeddings"):
|
||||
@@ -238,23 +248,17 @@ def embed_chunks(tenant, model_variables, document_version, chunks):
|
||||
return new_embeddings
|
||||
|
||||
|
||||
def create_potential_chunks_for_markdown(tenant_id, document_version, input_file):
|
||||
def create_potential_chunks_for_markdown(tenant_id, document_version, processor, markdown):
|
||||
try:
|
||||
current_app.logger.info(f'Creating potential chunks for tenant {tenant_id}')
|
||||
markdown_on = document_version.object_name.rsplit('.', 1)[0] + '.md'
|
||||
|
||||
# Download the markdown file from MinIO
|
||||
markdown_data = minio_client.download_document_file(tenant_id,
|
||||
document_version.bucket_name,
|
||||
markdown_on,
|
||||
)
|
||||
markdown = markdown_data.decode('utf-8')
|
||||
heading_level = processor.configuration.get('chunking_heading_level', 2)
|
||||
|
||||
headers_to_split_on = [
|
||||
("#", "Header 1"),
|
||||
("##", "Header 2"),
|
||||
(f"{'#' * i}", f"Header {i}") for i in range(1, min(heading_level + 1, 7))
|
||||
]
|
||||
|
||||
processor.log_tuning('Headers to split on', {'header list: ': headers_to_split_on})
|
||||
|
||||
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on, strip_headers=False)
|
||||
md_header_splits = markdown_splitter.split_text(markdown)
|
||||
potential_chunks = [doc.page_content for doc in md_header_splits]
|
||||
@@ -265,14 +269,61 @@ def create_potential_chunks_for_markdown(tenant_id, document_version, input_file
|
||||
raise
|
||||
|
||||
|
||||
def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars):
|
||||
def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars, processor):
|
||||
actual_chunks = []
|
||||
current_chunk = ""
|
||||
current_length = 0
|
||||
|
||||
def matches_chunking_pattern(text, patterns):
|
||||
if not patterns:
|
||||
return False
|
||||
|
||||
# Get the first line of the text
|
||||
first_line = text.split('\n', 1)[0].strip()
|
||||
|
||||
# Check if it's a header at appropriate level
|
||||
header_match = re.match(r'^(#{1,6})\s+(.+)$', first_line)
|
||||
if not header_match:
|
||||
return False
|
||||
|
||||
# Get the heading level (number of #s)
|
||||
header_level = len(header_match.group(1))
|
||||
# Get the header text
|
||||
header_text = header_match.group(2)
|
||||
|
||||
# Check if header matches any pattern
|
||||
for pattern in patterns:
|
||||
try:
|
||||
processor.log_tuning('Pattern check: ', {
|
||||
'pattern: ': pattern,
|
||||
'text': header_text
|
||||
})
|
||||
if re.search(pattern, header_text, re.IGNORECASE):
|
||||
return True
|
||||
except Exception as e:
|
||||
current_app.logger.warning(f"Invalid regex pattern '{pattern}': {str(e)}")
|
||||
continue
|
||||
|
||||
return False
|
||||
|
||||
chunking_patterns = json_to_pattern_list(processor.configuration.get('chunking_patterns', []))
|
||||
|
||||
processor.log_tuning(f'Chunking Patterns Extraction: ', {
|
||||
'Full Configuration': processor.configuration,
|
||||
'Chunking Patterns': chunking_patterns,
|
||||
})
|
||||
|
||||
for chunk in potential_chunks:
|
||||
chunk_length = len(chunk)
|
||||
|
||||
# Force new chunk if pattern matches
|
||||
if chunking_patterns and matches_chunking_pattern(chunk, chunking_patterns):
|
||||
if current_chunk and current_length >= min_chars:
|
||||
actual_chunks.append(current_chunk)
|
||||
current_chunk = chunk
|
||||
current_length = chunk_length
|
||||
continue
|
||||
|
||||
if current_length + chunk_length > max_chars:
|
||||
if current_length >= min_chars:
|
||||
actual_chunks.append(current_chunk)
|
||||
|
||||
Reference in New Issue
Block a user