237 lines
9.9 KiB
Python
237 lines
9.9 KiB
Python
import io
|
|
import pdfplumber
|
|
from flask import current_app
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
|
from langchain_core.output_parsers import StrOutputParser
|
|
from langchain_core.prompts import ChatPromptTemplate
|
|
import re
|
|
from langchain_core.runnables import RunnablePassthrough
|
|
|
|
from common.extensions import minio_client
|
|
from common.utils.model_utils import create_language_template
|
|
from .processor import Processor
|
|
from common.utils.business_event_context import current_event
|
|
|
|
|
|
class PDFProcessor(Processor):
|
|
def __init__(self, tenant, model_variables, document_version):
|
|
super().__init__(tenant, model_variables, document_version)
|
|
# PDF-specific initialization
|
|
self.chunk_size = model_variables['processing_chunk_size']
|
|
self.chunk_overlap = model_variables['processing_chunk_overlap']
|
|
self.min_chunk_size = model_variables['processing_min_chunk_size']
|
|
self.max_chunk_size = model_variables['processing_max_chunk_size']
|
|
|
|
def process(self):
|
|
self._log("Starting PDF processing")
|
|
try:
|
|
file_data = minio_client.download_document_file(
|
|
self.tenant.id,
|
|
self.document_version.doc_id,
|
|
self.document_version.language,
|
|
self.document_version.id,
|
|
self.document_version.file_name
|
|
)
|
|
|
|
with current_event.create_span("PDF Extraction"):
|
|
extracted_content = self._extract_content(file_data)
|
|
structured_content, title = self._structure_content(extracted_content)
|
|
|
|
with current_event.create_span("Markdown Generation"):
|
|
llm_chunks = self._split_content_for_llm(structured_content)
|
|
markdown = self._process_chunks_with_llm(llm_chunks)
|
|
self._save_markdown(markdown)
|
|
self._log("Finished processing PDF")
|
|
return markdown, title
|
|
except Exception as e:
|
|
self._log(f"Error processing PDF: {str(e)}", level='error')
|
|
raise
|
|
|
|
def _extract_content(self, file_data):
|
|
extracted_content = []
|
|
with pdfplumber.open(io.BytesIO(file_data)) as pdf:
|
|
figure_counter = 1
|
|
for page_num, page in enumerate(pdf.pages):
|
|
self._log(f"Extracting content from page {page_num + 1}")
|
|
page_content = {
|
|
'text': page.extract_text(),
|
|
'figures': self._extract_figures(page, page_num, figure_counter),
|
|
'tables': self._extract_tables(page)
|
|
}
|
|
if self.embed_tuning:
|
|
self._log(f'Extracted PDF Content for page {page_num + 1}')
|
|
self._log(f"{page_content }")
|
|
figure_counter += len(page_content['figures'])
|
|
extracted_content.append(page_content)
|
|
|
|
# if self.embed_tuning:
|
|
# current_app.embed_tuning_logger.debug(f'Extracted PDF Content')
|
|
# current_app.embed_tuning_logger.debug(f'---------------------')
|
|
# current_app.embed_tuning_logger.debug(f'Page: {page_content}')
|
|
# current_app.embed_tuning_logger.debug(f'End of Extracted PDF Content')
|
|
# current_app.embed_tuning_logger.debug(f'----------------------------')
|
|
|
|
return extracted_content
|
|
|
|
def _extract_figures(self, page, page_num, figure_counter):
|
|
figures = []
|
|
# Omit figure processing for now!
|
|
# for img in page.images:
|
|
# try:
|
|
# # Try to get the bbox, use full page dimensions if not available
|
|
# bbox = img.get('bbox', (0, 0, page.width, page.height))
|
|
#
|
|
# figure = {
|
|
# 'figure_number': figure_counter,
|
|
# 'filename': f"figure_{page_num + 1}_{figure_counter}.png",
|
|
# 'caption': self._find_figure_caption(page, bbox)
|
|
# }
|
|
#
|
|
# # Extract the figure as an image
|
|
# figure_image = page.within_bbox(bbox).to_image()
|
|
#
|
|
# # Save the figure using MinIO
|
|
# with io.BytesIO() as output:
|
|
# figure_image.save(output, format='PNG')
|
|
# output.seek(0)
|
|
# minio_client.upload_document_file(
|
|
# self.tenant.id,
|
|
# self.document_version.doc_id,
|
|
# self.document_version.language,
|
|
# self.document_version.id,
|
|
# figure['filename'],
|
|
# output.getvalue()
|
|
# )
|
|
#
|
|
# figures.append(figure)
|
|
# figure_counter += 1
|
|
# except Exception as e:
|
|
# self._log(f"Error processing figure on page {page_num + 1}: {str(e)}", level='error')
|
|
|
|
return figures
|
|
|
|
def _find_figure_caption(self, page, bbox):
|
|
try:
|
|
# Look for text below the figure
|
|
caption_bbox = (bbox[0], bbox[3], bbox[2], min(bbox[3] + 50, page.height))
|
|
caption_text = page.crop(caption_bbox).extract_text()
|
|
if caption_text and caption_text.lower().startswith('figure'):
|
|
return caption_text
|
|
except Exception as e:
|
|
self._log(f"Error finding figure caption: {str(e)}", level='error')
|
|
return None
|
|
|
|
def _extract_tables(self, page):
|
|
tables = []
|
|
try:
|
|
for table in page.extract_tables():
|
|
if table:
|
|
markdown_table = self._table_to_markdown(table)
|
|
if markdown_table: # Only add non-empty tables
|
|
tables.append(markdown_table)
|
|
except Exception as e:
|
|
self._log(f"Error extracting tables from page: {str(e)}", level='error')
|
|
return tables
|
|
|
|
def _table_to_markdown(self, table):
|
|
if not table or not table[0]: # Check if table is empty or first row is empty
|
|
return "" # Return empty string for empty tables
|
|
|
|
def clean_cell(cell):
|
|
if cell is None:
|
|
return "" # Convert None to empty string
|
|
return str(cell).replace("|", "\\|") # Escape pipe characters and convert to string
|
|
|
|
header = [clean_cell(cell) for cell in table[0]]
|
|
markdown = "| " + " | ".join(header) + " |\n"
|
|
markdown += "| " + " | ".join(["---"] * len(header)) + " |\n"
|
|
|
|
for row in table[1:]:
|
|
cleaned_row = [clean_cell(cell) for cell in row]
|
|
markdown += "| " + " | ".join(cleaned_row) + " |\n"
|
|
|
|
return markdown
|
|
|
|
def _structure_content(self, extracted_content):
|
|
structured_content = ""
|
|
title = "Untitled Document"
|
|
current_heading_level = 0
|
|
heading_pattern = re.compile(r'^(\d+(\.\d+)*\.?\s*)?(.+)$')
|
|
|
|
def identify_heading(text):
|
|
match = heading_pattern.match(text.strip())
|
|
if match:
|
|
numbering, _, content = match.groups()
|
|
if numbering:
|
|
level = numbering.count('.') + 1
|
|
return level, f"{numbering}{content}"
|
|
else:
|
|
return 1, content # Assume it's a top-level heading if no numbering
|
|
return 0, text # Not a heading
|
|
|
|
for page in extracted_content:
|
|
# Assume the title is on the first page
|
|
if page == extracted_content[0]:
|
|
lines = page.get('text', '').split('\n')
|
|
if lines:
|
|
title = lines[0].strip() # Use the first non-empty line as the title
|
|
|
|
# Process text
|
|
paragraphs = page['text'].split('\n\n')
|
|
|
|
for para in paragraphs:
|
|
lines = para.strip().split('\n')
|
|
if len(lines) == 1: # Potential heading
|
|
level, text = identify_heading(lines[0])
|
|
if level > 0:
|
|
heading_marks = '#' * level
|
|
structured_content += f"\n\n{heading_marks} {text}\n\n"
|
|
if level == 1 and not title:
|
|
title = text # Use the first top-level heading as the title if not set
|
|
else:
|
|
structured_content += f"{para}\n\n" # Treat as normal paragraph
|
|
else:
|
|
structured_content += f"{para}\n\n" # Multi-line paragraph
|
|
|
|
# Process figures
|
|
for figure in page.get('figures', []):
|
|
structured_content += f"\n\n![Figure {figure['figure_number']}]({figure['filename']})\n\n"
|
|
if figure['caption']:
|
|
structured_content += f"*Figure {figure['figure_number']}: {figure['caption']}*\n\n"
|
|
|
|
# Add tables
|
|
if 'tables' in page:
|
|
for table in page['tables']:
|
|
structured_content += f"\n{table}\n"
|
|
|
|
if self.embed_tuning:
|
|
self._save_intermediate(structured_content, "structured_content.md")
|
|
|
|
return structured_content, title
|
|
|
|
def _split_content_for_llm(self, content):
|
|
text_splitter = RecursiveCharacterTextSplitter(
|
|
chunk_size=self.chunk_size,
|
|
chunk_overlap=self.chunk_overlap,
|
|
length_function=len,
|
|
separators=["\n\n", "\n", " ", ""]
|
|
)
|
|
return text_splitter.split_text(content)
|
|
|
|
def _process_chunks_with_llm(self, chunks):
|
|
llm = self.model_variables['llm']
|
|
template = self.model_variables['pdf_parse_template']
|
|
pdf_prompt = ChatPromptTemplate.from_template(template)
|
|
setup = RunnablePassthrough()
|
|
output_parser = StrOutputParser()
|
|
chain = setup | pdf_prompt | llm | output_parser
|
|
|
|
markdown_chunks = []
|
|
for chunk in chunks:
|
|
input = {"pdf_content": chunk}
|
|
result = chain.invoke(input)
|
|
result = self._clean_markdown(result)
|
|
markdown_chunks.append(result)
|
|
|
|
return "\n\n".join(markdown_chunks)
|