import io import pdfplumber from flask import current_app from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate import re from langchain_core.runnables import RunnablePassthrough from common.extensions import minio_client from common.utils.model_utils import create_language_template from .processor import Processor from common.utils.business_event_context import current_event class PDFProcessor(Processor): def __init__(self, tenant, model_variables, document_version): super().__init__(tenant, model_variables, document_version) # PDF-specific initialization self.chunk_size = model_variables['processing_chunk_size'] self.chunk_overlap = model_variables['processing_chunk_overlap'] self.min_chunk_size = model_variables['processing_min_chunk_size'] self.max_chunk_size = model_variables['processing_max_chunk_size'] def process(self): self._log("Starting PDF processing") try: file_data = minio_client.download_document_file( self.tenant.id, self.document_version.doc_id, self.document_version.language, self.document_version.id, self.document_version.file_name ) with current_event.create_span("PDF Extraction"): extracted_content = self._extract_content(file_data) structured_content, title = self._structure_content(extracted_content) with current_event.create_span("Markdown Generation"): llm_chunks = self._split_content_for_llm(structured_content) markdown = self._process_chunks_with_llm(llm_chunks) self._save_markdown(markdown) self._log("Finished processing PDF") return markdown, title except Exception as e: self._log(f"Error processing PDF: {str(e)}", level='error') raise def _extract_content(self, file_data): extracted_content = [] with pdfplumber.open(io.BytesIO(file_data)) as pdf: figure_counter = 1 for page_num, page in enumerate(pdf.pages): self._log(f"Extracting content from page {page_num + 1}") page_content = { 'text': page.extract_text(), 'figures': self._extract_figures(page, page_num, figure_counter), 'tables': self._extract_tables(page) } if self.embed_tuning: self._log(f'Extracted PDF Content for page {page_num + 1}') self._log(f"{page_content }") figure_counter += len(page_content['figures']) extracted_content.append(page_content) # if self.embed_tuning: # current_app.embed_tuning_logger.debug(f'Extracted PDF Content') # current_app.embed_tuning_logger.debug(f'---------------------') # current_app.embed_tuning_logger.debug(f'Page: {page_content}') # current_app.embed_tuning_logger.debug(f'End of Extracted PDF Content') # current_app.embed_tuning_logger.debug(f'----------------------------') return extracted_content def _extract_figures(self, page, page_num, figure_counter): figures = [] # Omit figure processing for now! # for img in page.images: # try: # # Try to get the bbox, use full page dimensions if not available # bbox = img.get('bbox', (0, 0, page.width, page.height)) # # figure = { # 'figure_number': figure_counter, # 'filename': f"figure_{page_num + 1}_{figure_counter}.png", # 'caption': self._find_figure_caption(page, bbox) # } # # # Extract the figure as an image # figure_image = page.within_bbox(bbox).to_image() # # # Save the figure using MinIO # with io.BytesIO() as output: # figure_image.save(output, format='PNG') # output.seek(0) # minio_client.upload_document_file( # self.tenant.id, # self.document_version.doc_id, # self.document_version.language, # self.document_version.id, # figure['filename'], # output.getvalue() # ) # # figures.append(figure) # figure_counter += 1 # except Exception as e: # self._log(f"Error processing figure on page {page_num + 1}: {str(e)}", level='error') return figures def _find_figure_caption(self, page, bbox): try: # Look for text below the figure caption_bbox = (bbox[0], bbox[3], bbox[2], min(bbox[3] + 50, page.height)) caption_text = page.crop(caption_bbox).extract_text() if caption_text and caption_text.lower().startswith('figure'): return caption_text except Exception as e: self._log(f"Error finding figure caption: {str(e)}", level='error') return None def _extract_tables(self, page): tables = [] try: for table in page.extract_tables(): if table: markdown_table = self._table_to_markdown(table) if markdown_table: # Only add non-empty tables tables.append(markdown_table) except Exception as e: self._log(f"Error extracting tables from page: {str(e)}", level='error') return tables def _table_to_markdown(self, table): if not table or not table[0]: # Check if table is empty or first row is empty return "" # Return empty string for empty tables def clean_cell(cell): if cell is None: return "" # Convert None to empty string return str(cell).replace("|", "\\|") # Escape pipe characters and convert to string header = [clean_cell(cell) for cell in table[0]] markdown = "| " + " | ".join(header) + " |\n" markdown += "| " + " | ".join(["---"] * len(header)) + " |\n" for row in table[1:]: cleaned_row = [clean_cell(cell) for cell in row] markdown += "| " + " | ".join(cleaned_row) + " |\n" return markdown def _structure_content(self, extracted_content): structured_content = "" title = "Untitled Document" current_heading_level = 0 heading_pattern = re.compile(r'^(\d+(\.\d+)*\.?\s*)?(.+)$') def identify_heading(text): match = heading_pattern.match(text.strip()) if match: numbering, _, content = match.groups() if numbering: level = numbering.count('.') + 1 return level, f"{numbering}{content}" else: return 1, content # Assume it's a top-level heading if no numbering return 0, text # Not a heading for page in extracted_content: # Assume the title is on the first page if page == extracted_content[0]: lines = page.get('text', '').split('\n') if lines: title = lines[0].strip() # Use the first non-empty line as the title # Process text paragraphs = page['text'].split('\n\n') for para in paragraphs: lines = para.strip().split('\n') if len(lines) == 1: # Potential heading level, text = identify_heading(lines[0]) if level > 0: heading_marks = '#' * level structured_content += f"\n\n{heading_marks} {text}\n\n" if level == 1 and not title: title = text # Use the first top-level heading as the title if not set else: structured_content += f"{para}\n\n" # Treat as normal paragraph else: structured_content += f"{para}\n\n" # Multi-line paragraph # Process figures for figure in page.get('figures', []): structured_content += f"\n\n![Figure {figure['figure_number']}]({figure['filename']})\n\n" if figure['caption']: structured_content += f"*Figure {figure['figure_number']}: {figure['caption']}*\n\n" # Add tables if 'tables' in page: for table in page['tables']: structured_content += f"\n{table}\n" if self.embed_tuning: self._save_intermediate(structured_content, "structured_content.md") return structured_content, title def _split_content_for_llm(self, content): text_splitter = RecursiveCharacterTextSplitter( chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap, length_function=len, separators=["\n\n", "\n", " ", ""] ) return text_splitter.split_text(content) def _process_chunks_with_llm(self, chunks): llm = self.model_variables['llm'] template = self.model_variables['pdf_parse_template'] pdf_prompt = ChatPromptTemplate.from_template(template) setup = RunnablePassthrough() output_parser = StrOutputParser() chain = setup | pdf_prompt | llm | output_parser markdown_chunks = [] for chunk in chunks: input = {"pdf_content": chunk} result = chain.invoke(input) result = self._clean_markdown(result) markdown_chunks.append(result) return "\n\n".join(markdown_chunks)