Files
eveAI/eveai_workers/Processors/html_processor.py
2024-10-17 10:31:13 +02:00

149 lines
6.3 KiB
Python

from bs4 import BeautifulSoup
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from common.extensions import db, minio_client
from common.utils.model_utils import create_language_template
from .processor import Processor
from common.utils.business_event_context import current_event
class HTMLProcessor(Processor):
def __init__(self, tenant, model_variables, document_version):
super().__init__(tenant, model_variables, document_version)
self.html_tags = model_variables['html_tags']
self.html_end_tags = model_variables['html_end_tags']
self.html_included_elements = model_variables['html_included_elements']
self.html_excluded_elements = model_variables['html_excluded_elements']
self.html_excluded_classes = model_variables['html_excluded_classes']
self.chunk_size = model_variables['processing_chunk_size'] # Adjust this based on your LLM's optimal input size
self.chunk_overlap = model_variables[
'processing_chunk_overlap'] # Adjust for context preservation between chunks
def process(self):
self._log("Starting HTML processing")
try:
file_data = minio_client.download_document_file(
self.tenant.id,
self.document_version.bucket_name,
self.document_version.object_name,
)
html_content = file_data.decode('utf-8')
with current_event.create_span("HTML Content Extraction"):
extracted_html, title = self._parse_html(html_content)
with current_event.create_span("Markdown Generation"):
markdown = self._generate_markdown_from_html(extracted_html)
self._save_markdown(markdown)
self._log("Finished processing HTML")
return markdown, title
except Exception as e:
self._log(f"Error processing HTML: {str(e)}", level='error')
raise
def _parse_html(self, html_content):
self._log(f'Parsing HTML for tenant {self.tenant.id}')
soup = BeautifulSoup(html_content, 'html.parser')
extracted_html = ''
excluded_classes = self._parse_excluded_classes(self.html_excluded_classes)
if self.html_included_elements:
elements_to_parse = soup.find_all(self.html_included_elements)
else:
elements_to_parse = [soup]
for element in elements_to_parse:
for sub_element in element.find_all(self.html_tags):
if self._should_exclude_element(sub_element, excluded_classes):
continue
extracted_html += self._extract_element_content(sub_element)
title = soup.find('title').get_text(strip=True) if soup.find('title') else ''
self._log(f'Finished parsing HTML for tenant {self.tenant.id}')
return extracted_html, title
def _generate_markdown_from_html(self, html_content):
self._log(f'Generating markdown from HTML for tenant {self.tenant.id}')
llm = self.model_variables['llm']
template = self.model_variables['html_parse_template']
parse_prompt = ChatPromptTemplate.from_template(template)
setup = RunnablePassthrough()
output_parser = StrOutputParser()
chain = setup | parse_prompt | llm | output_parser
soup = BeautifulSoup(html_content, 'lxml')
chunks = self._split_content(soup, self.chunk_size)
markdown_chunks = []
for chunk in chunks:
if self.embed_tuning:
self._log(f'Processing chunk: \n{chunk}\n')
input_html = {"html": chunk}
markdown_chunk = chain.invoke(input_html)
markdown_chunks.append(markdown_chunk)
if self.embed_tuning:
self._log(f'Processed markdown chunk: \n{markdown_chunk}\n')
markdown = "\n\n".join(markdown_chunks)
self._log(f'Finished generating markdown from HTML for tenant {self.tenant.id}')
return markdown
def _split_content(self, soup, max_size=20000):
chunks = []
current_chunk = []
current_size = 0
for element in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'div', 'span', 'table']):
element_html = str(element)
element_size = len(element_html)
if current_size + element_size > max_size and current_chunk:
chunks.append(''.join(map(str, current_chunk)))
current_chunk = []
current_size = 0
current_chunk.append(element)
current_size += element_size
if element.name in ['h1', 'h2', 'h3'] and current_size > max_size:
chunks.append(''.join(map(str, current_chunk)))
current_chunk = []
current_size = 0
if current_chunk:
chunks.append(''.join(map(str, current_chunk)))
return chunks
def _parse_excluded_classes(self, excluded_classes):
parsed = {}
if excluded_classes:
for rule in excluded_classes:
element, cls = rule.split('.', 1)
parsed.setdefault(element, set()).add(cls)
return parsed
def _should_exclude_element(self, element, excluded_classes):
if self.html_excluded_elements and element.find_parent(self.html_excluded_elements):
return True
return self._is_element_excluded_by_class(element, excluded_classes)
def _is_element_excluded_by_class(self, element, excluded_classes):
for parent in element.parents:
if self._element_matches_exclusion(parent, excluded_classes):
return True
return self._element_matches_exclusion(element, excluded_classes)
def _element_matches_exclusion(self, element, excluded_classes):
if '*' in excluded_classes and any(cls in excluded_classes['*'] for cls in element.get('class', [])):
return True
return element.name in excluded_classes and \
any(cls in excluded_classes[element.name] for cls in element.get('class', []))
def _extract_element_content(self, element):
content = ' '.join(child.strip() for child in element.stripped_strings)
return f'<{element.name}>{content}</{element.name}>\n'