- Allow for more complex and longer PDFs to be uploaded to Evie. First implmentation of a processor for specific file types.

- Allow URLs to contain other information than just HTML information. It can alose refer to e.g. PDF-files.
This commit is contained in:
Josako
2024-08-27 07:05:56 +02:00
parent 2ca006d82c
commit 122d1a18df
9 changed files with 458 additions and 86 deletions

View File

@@ -0,0 +1,271 @@
import io
import pdfplumber
from flask import current_app
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
import re
from langchain_core.runnables import RunnablePassthrough
from common.extensions import minio_client
from common.utils.model_utils import create_language_template
class PDFProcessor:
def __init__(self, tenant, model_variables, document_version):
self.tenant = tenant
self.model_variables = model_variables
self.document_version = document_version
# Configuration parameters from model_variables
self.chunk_size = model_variables['PDF_chunk_size']
self.chunk_overlap = model_variables['PDF_chunk_overlap']
self.min_chunk_size = model_variables['PDF_min_chunk_size']
self.max_chunk_size = model_variables['PDF_max_chunk_size']
# Set tuning variable for easy use
self.embed_tuning = model_variables['embed_tuning']
def process_pdf(self):
self._log("Starting PDF processing")
try:
file_data = minio_client.download_document_file(
self.tenant.id,
self.document_version.doc_id,
self.document_version.language,
self.document_version.id,
self.document_version.file_name
)
extracted_content = self._extract_content(file_data)
structured_content, title = self._structure_content(extracted_content)
llm_chunks = self._split_content_for_llm(structured_content)
markdown = self._process_chunks_with_llm(llm_chunks)
self._save_markdown(markdown)
self._log("Finished processing PDF")
return markdown, title
except Exception as e:
self._log(f"Error processing PDF: {str(e)}", level='error')
raise
def _log(self, message, level='debug'):
logger = current_app.logger
log_method = getattr(logger, level)
log_method(f"PDFProcessor - Tenant {self.tenant.id}, Document {self.document_version.id}: {message}")
def _extract_content(self, file_data):
extracted_content = []
with pdfplumber.open(io.BytesIO(file_data)) as pdf:
figure_counter = 1
for page_num, page in enumerate(pdf.pages):
self._log(f"Extracting content from page {page_num + 1}")
page_content = {
'text': page.extract_text(),
'figures': self._extract_figures(page, page_num, figure_counter),
'tables': self._extract_tables(page)
}
if self.embed_tuning:
self._log(f'Extracted PDF Content for page {page_num + 1}')
self._log(f"{page_content }")
figure_counter += len(page_content['figures'])
extracted_content.append(page_content)
# if self.embed_tuning:
# current_app.embed_tuning_logger.debug(f'Extracted PDF Content')
# current_app.embed_tuning_logger.debug(f'---------------------')
# current_app.embed_tuning_logger.debug(f'Page: {page_content}')
# current_app.embed_tuning_logger.debug(f'End of Extracted PDF Content')
# current_app.embed_tuning_logger.debug(f'----------------------------')
return extracted_content
def _extract_figures(self, page, page_num, figure_counter):
figures = []
# Omit figure processing for now!
# for img in page.images:
# try:
# # Try to get the bbox, use full page dimensions if not available
# bbox = img.get('bbox', (0, 0, page.width, page.height))
#
# figure = {
# 'figure_number': figure_counter,
# 'filename': f"figure_{page_num + 1}_{figure_counter}.png",
# 'caption': self._find_figure_caption(page, bbox)
# }
#
# # Extract the figure as an image
# figure_image = page.within_bbox(bbox).to_image()
#
# # Save the figure using MinIO
# with io.BytesIO() as output:
# figure_image.save(output, format='PNG')
# output.seek(0)
# minio_client.upload_document_file(
# self.tenant.id,
# self.document_version.doc_id,
# self.document_version.language,
# self.document_version.id,
# figure['filename'],
# output.getvalue()
# )
#
# figures.append(figure)
# figure_counter += 1
# except Exception as e:
# self._log(f"Error processing figure on page {page_num + 1}: {str(e)}", level='error')
return figures
def _find_figure_caption(self, page, bbox):
try:
# Look for text below the figure
caption_bbox = (bbox[0], bbox[3], bbox[2], min(bbox[3] + 50, page.height))
caption_text = page.crop(caption_bbox).extract_text()
if caption_text and caption_text.lower().startswith('figure'):
return caption_text
except Exception as e:
self._log(f"Error finding figure caption: {str(e)}", level='error')
return None
def _extract_tables(self, page):
tables = []
try:
for table in page.extract_tables():
if table:
markdown_table = self._table_to_markdown(table)
if markdown_table: # Only add non-empty tables
tables.append(markdown_table)
except Exception as e:
self._log(f"Error extracting tables from page: {str(e)}", level='error')
return tables
def _table_to_markdown(self, table):
if not table or not table[0]: # Check if table is empty or first row is empty
return "" # Return empty string for empty tables
def clean_cell(cell):
if cell is None:
return "" # Convert None to empty string
return str(cell).replace("|", "\\|") # Escape pipe characters and convert to string
header = [clean_cell(cell) for cell in table[0]]
markdown = "| " + " | ".join(header) + " |\n"
markdown += "| " + " | ".join(["---"] * len(header)) + " |\n"
for row in table[1:]:
cleaned_row = [clean_cell(cell) for cell in row]
markdown += "| " + " | ".join(cleaned_row) + " |\n"
return markdown
def _structure_content(self, extracted_content):
structured_content = ""
title = "Untitled Document"
current_heading_level = 0
heading_pattern = re.compile(r'^(\d+(\.\d+)*\.?\s*)?(.+)$')
def identify_heading(text):
match = heading_pattern.match(text.strip())
if match:
numbering, _, content = match.groups()
if numbering:
level = numbering.count('.') + 1
return level, f"{numbering}{content}"
else:
return 1, content # Assume it's a top-level heading if no numbering
return 0, text # Not a heading
for page in extracted_content:
# Assume the title is on the first page
if page == extracted_content[0]:
lines = page.get('text', '').split('\n')
if lines:
title = lines[0].strip() # Use the first non-empty line as the title
# Process text
paragraphs = page['text'].split('\n\n')
for para in paragraphs:
lines = para.strip().split('\n')
if len(lines) == 1: # Potential heading
level, text = identify_heading(lines[0])
if level > 0:
heading_marks = '#' * level
structured_content += f"\n\n{heading_marks} {text}\n\n"
if level == 1 and not title:
title = text # Use the first top-level heading as the title if not set
else:
structured_content += f"{para}\n\n" # Treat as normal paragraph
else:
structured_content += f"{para}\n\n" # Multi-line paragraph
# Process figures
for figure in page.get('figures', []):
structured_content += f"\n\n![Figure {figure['figure_number']}]({figure['filename']})\n\n"
if figure['caption']:
structured_content += f"*Figure {figure['figure_number']}: {figure['caption']}*\n\n"
# Add tables
if 'tables' in page:
for table in page['tables']:
structured_content += f"\n{table}\n"
if self.embed_tuning:
self._save_intermediate(structured_content, "structured_content.md")
return structured_content, title
def _split_content_for_llm(self, content):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
return text_splitter.split_text(content)
def _process_chunks_with_llm(self, chunks):
llm = self.model_variables['llm']
template = self.model_variables['pdf_parse_template']
pdf_prompt = ChatPromptTemplate.from_template(template)
setup = RunnablePassthrough()
output_parser = StrOutputParser()
chain = setup | pdf_prompt | llm | output_parser
markdown_chunks = []
for chunk in chunks:
input = {"pdf_content": chunk}
result = chain.invoke(input)
# Remove Markdown code block delimiters if present
result = result.strip()
if result.startswith("```markdown"):
result = result[len("```markdown"):].strip()
if result.endswith("```"):
result = result[:-3].strip()
markdown_chunks.append(result)
return "\n\n".join(markdown_chunks)
def _save_markdown(self, markdown):
markdown_filename = f"{self.document_version.id}.md"
minio_client.upload_document_file(
self.tenant.id,
self.document_version.doc_id,
self.document_version.language,
self.document_version.id,
markdown_filename,
markdown.encode('utf-8')
)
def _save_intermediate(self, content, filename):
minio_client.upload_document_file(
self.tenant.id,
self.document_version.doc_id,
self.document_version.language,
self.document_version.id,
filename,
content.encode('utf-8')
)

View File

@@ -29,6 +29,7 @@ from common.utils.celery_utils import current_celery
from common.utils.database import Database
from common.utils.model_utils import select_model_variables, create_language_template
from common.utils.os_utils import safe_remove, sync_folder
from eveai_workers.Processors.PDF_Processor import PDFProcessor
@current_celery.task(name='create_embeddings', queue='embeddings')
@@ -103,34 +104,67 @@ def create_embeddings(tenant_id, document_version_id):
raise
# def process_pdf(tenant, model_variables, document_version):
# file_data = minio_client.download_document_file(tenant.id, document_version.doc_id, document_version.language,
# document_version.id, document_version.file_name)
#
# pdf_text = ''
# pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_data))
# for page in pdf_reader.pages:
# pdf_text += page.extract_text()
#
# markdown = generate_markdown_from_pdf(tenant, model_variables, document_version, pdf_text)
# markdown_file_name = f'{document_version.id}.md'
# minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language,
# document_version.id,
# markdown_file_name, markdown.encode())
#
# potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name)
# chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'],
# model_variables['max_chunk_size'])
#
# if len(chunks) > 1:
# summary = summarize_chunk(tenant, model_variables, document_version, chunks[0])
# document_version.system_context = f'Summary: {summary}\n'
# else:
# document_version.system_context = ''
#
# enriched_chunks = enrich_chunks(tenant, document_version, chunks)
# embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks)
#
# try:
# db.session.add(document_version)
# document_version.processing_finished_at = dt.now(tz.utc)
# document_version.processing = False
# db.session.add_all(embeddings)
# db.session.commit()
# except SQLAlchemyError as e:
# current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} '
# f'on HTML, document version {document_version.id}'
# f'error: {e}')
# raise
#
# current_app.logger.info(f'Embeddings created successfully for tenant {tenant.id} '
# f'on document version {document_version.id} :-)')
def process_pdf(tenant, model_variables, document_version):
file_data = minio_client.download_document_file(tenant.id, document_version.doc_id, document_version.language,
document_version.id, document_version.file_name)
processor = PDFProcessor(tenant, model_variables, document_version)
markdown, title = processor.process_pdf()
pdf_text = ''
pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_data))
for page in pdf_reader.pages:
pdf_text += page.extract_text()
# Create potential chunks for embedding
potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, f"{document_version.id}.md")
markdown = generate_markdown_from_pdf(tenant, model_variables, document_version, pdf_text)
markdown_file_name = f'{document_version.id}.md'
minio_client.upload_document_file(tenant.id, document_version.doc_id, document_version.language,
document_version.id,
markdown_file_name, markdown.encode())
potential_chunks = create_potential_chunks_for_markdown(tenant.id, document_version, markdown_file_name)
# Combine chunks for embedding
chunks = combine_chunks_for_markdown(potential_chunks, model_variables['min_chunk_size'],
model_variables['max_chunk_size'])
if len(chunks) > 1:
summary = summarize_chunk(tenant, model_variables, document_version, chunks[0])
document_version.system_context = f'Summary: {summary}\n'
else:
document_version.system_context = ''
# Enrich chunks
enriched_chunks = enrich_chunks(tenant, document_version, title, chunks)
enriched_chunks = enrich_chunks(tenant, document_version, chunks)
# Create embeddings
embeddings = embed_chunks(tenant, model_variables, document_version, enriched_chunks)
# Update document version and save embeddings
try:
db.session.add(document_version)
document_version.processing_finished_at = dt.now(tz.utc)
@@ -139,7 +173,7 @@ def process_pdf(tenant, model_variables, document_version):
db.session.commit()
except SQLAlchemyError as e:
current_app.logger.error(f'Error saving embedding information for tenant {tenant.id} '
f'on HTML, document version {document_version.id}'
f'on PDF, document version {document_version.id}'
f'error: {e}')
raise
@@ -238,26 +272,6 @@ def enrich_chunks(tenant, document_version, title, chunks):
return enriched_chunks
# def generate_markdown_from_html(tenant, model_variables, document_version, html_content):
# current_app.logger.debug(f'Generating markdown from HTML for tenant {tenant.id} '
# f'on document version {document_version.id}')
# llm = model_variables['llm']
# template = model_variables['html_parse_template']
# parse_prompt = ChatPromptTemplate.from_template(template)
# setup = RunnablePassthrough()
# output_parser = StrOutputParser()
#
# chain = setup | parse_prompt | llm | output_parser
# input_html = {"html": html_content}
#
# markdown = chain.invoke(input_html)
#
# current_app.logger.debug(f'Finished generating markdown from HTML for tenant {tenant.id} '
# f'on document version {document_version.id}')
#
# return markdown
def generate_markdown_from_html(tenant, model_variables, document_version, html_content):
current_app.logger.debug(f'Generating markdown from HTML for tenant {tenant.id} '
f'on document version {document_version.id}')
@@ -765,4 +779,4 @@ def combine_chunks_for_markdown(potential_chunks, min_chars, max_chars):
actual_chunks.append(current_chunk)
return actual_chunks
pass