163 lines
7.0 KiB
Python
163 lines
7.0 KiB
Python
from bs4 import BeautifulSoup
|
|
from langchain_core.output_parsers import StrOutputParser
|
|
from langchain_core.prompts import ChatPromptTemplate
|
|
from langchain_core.runnables import RunnablePassthrough
|
|
from common.extensions import db, minio_client
|
|
from common.utils.model_utils import create_language_template, get_embedding_llm
|
|
from .base_processor import BaseProcessor
|
|
from common.utils.business_event_context import current_event
|
|
from .processor_registry import ProcessorRegistry
|
|
from common.utils.string_list_converter import StringListConverter as SLC
|
|
|
|
|
|
class HTMLProcessor(BaseProcessor):
|
|
def __init__(self, tenant, model_variables, document_version, catalog, processor):
|
|
super().__init__(tenant, model_variables, document_version, catalog, processor)
|
|
cat_conf = catalog.configuration
|
|
proc_conf = processor.configuration
|
|
self.html_tags = SLC.string_to_list(proc_conf['html_tags'])
|
|
self.html_end_tags = SLC.string_to_list(proc_conf['html_end_tags'])
|
|
self.html_included_elements = SLC.string_to_list(proc_conf['html_included_elements'])
|
|
self.html_excluded_elements = SLC.string_to_list(proc_conf['html_excluded_elements'])
|
|
self.html_excluded_classes = SLC.string_to_list(proc_conf['html_excluded_classes'])
|
|
self.tuning = self.processor.tuning
|
|
# Add verification logging
|
|
self._log(f"HTML Processor initialized with tuning={self.tuning}")
|
|
if self.tuning:
|
|
self.log_tuning("HTML Processor initialized", {
|
|
"html_tags": self.html_tags,
|
|
"html_end_tags": self.html_end_tags,
|
|
"included_elements": self.html_included_elements,
|
|
"excluded_elements": self.html_excluded_elements
|
|
})
|
|
|
|
self.chunk_size = catalog.max_chunk_size
|
|
|
|
def process(self):
|
|
self._log("Starting HTML processing")
|
|
try:
|
|
file_data = minio_client.download_document_file(
|
|
self.tenant.id,
|
|
self.document_version.bucket_name,
|
|
self.document_version.object_name,
|
|
)
|
|
html_content = file_data.decode('utf-8')
|
|
|
|
with current_event.create_span("HTML Content Extraction"):
|
|
extracted_html, title = self._parse_html(html_content)
|
|
with current_event.create_span("Markdown Generation"):
|
|
markdown = self._generate_markdown_from_html(extracted_html)
|
|
|
|
self._save_markdown(markdown)
|
|
self._log("Finished processing HTML")
|
|
return markdown, title
|
|
except Exception as e:
|
|
self._log(f"Error processing HTML: {str(e)}", level='error')
|
|
raise
|
|
|
|
def _parse_html(self, html_content):
|
|
self._log(f'Parsing HTML for tenant {self.tenant.id}')
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
extracted_html = ''
|
|
excluded_classes = self._parse_excluded_classes(self.html_excluded_classes)
|
|
|
|
if self.html_included_elements:
|
|
elements_to_parse = soup.find_all(self.html_included_elements)
|
|
else:
|
|
elements_to_parse = [soup]
|
|
|
|
for element in elements_to_parse:
|
|
for sub_element in element.find_all(self.html_tags):
|
|
if self._should_exclude_element(sub_element, excluded_classes):
|
|
continue
|
|
extracted_html += self._extract_element_content(sub_element)
|
|
|
|
title = soup.find('title').get_text(strip=True) if soup.find('title') else ''
|
|
|
|
self._log(f'Finished parsing HTML for tenant {self.tenant.id}')
|
|
self.log_tuning("_parse_html", {"extracted_html": extracted_html, "title": title})
|
|
return extracted_html, title
|
|
|
|
def _generate_markdown_from_html(self, html_content):
|
|
self._log(f'Generating markdown from HTML for tenant {self.tenant.id}')
|
|
|
|
llm = get_embedding_llm()
|
|
template = self.model_variables.get_template("html_parse")
|
|
parse_prompt = ChatPromptTemplate.from_template(template)
|
|
setup = RunnablePassthrough()
|
|
output_parser = StrOutputParser()
|
|
chain = setup | parse_prompt | llm | output_parser
|
|
|
|
soup = BeautifulSoup(html_content, 'lxml')
|
|
chunks = self._split_content(soup, self.chunk_size)
|
|
|
|
markdown_chunks = []
|
|
for chunk in chunks:
|
|
input_html = {"html": chunk}
|
|
markdown_chunk = chain.invoke(input_html)
|
|
markdown_chunks.append(markdown_chunk)
|
|
self.log_tuning("_generate_markdown_from_html", {"chunk": chunk, "markdown_chunk": markdown_chunk})
|
|
|
|
markdown = "\n\n".join(markdown_chunks)
|
|
self._log(f'Finished generating markdown from HTML for tenant {self.tenant.id}')
|
|
return markdown
|
|
|
|
def _split_content(self, soup, max_size=20000):
|
|
chunks = []
|
|
current_chunk = []
|
|
current_size = 0
|
|
|
|
for element in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'div', 'span', 'table']):
|
|
element_html = str(element)
|
|
element_size = len(element_html)
|
|
|
|
if current_size + element_size > max_size and current_chunk:
|
|
chunks.append(''.join(map(str, current_chunk)))
|
|
current_chunk = []
|
|
current_size = 0
|
|
|
|
current_chunk.append(element)
|
|
current_size += element_size
|
|
|
|
if element.name in ['h1', 'h2', 'h3'] and current_size > max_size:
|
|
chunks.append(''.join(map(str, current_chunk)))
|
|
current_chunk = []
|
|
current_size = 0
|
|
|
|
if current_chunk:
|
|
chunks.append(''.join(map(str, current_chunk)))
|
|
|
|
return chunks
|
|
|
|
def _parse_excluded_classes(self, excluded_classes):
|
|
parsed = {}
|
|
if excluded_classes:
|
|
for rule in excluded_classes:
|
|
element, cls = rule.split('.', 1)
|
|
parsed.setdefault(element, set()).add(cls)
|
|
return parsed
|
|
|
|
def _should_exclude_element(self, element, excluded_classes):
|
|
if self.html_excluded_elements and element.find_parent(self.html_excluded_elements):
|
|
return True
|
|
return self._is_element_excluded_by_class(element, excluded_classes)
|
|
|
|
def _is_element_excluded_by_class(self, element, excluded_classes):
|
|
for parent in element.parents:
|
|
if self._element_matches_exclusion(parent, excluded_classes):
|
|
return True
|
|
return self._element_matches_exclusion(element, excluded_classes)
|
|
|
|
def _element_matches_exclusion(self, element, excluded_classes):
|
|
if '*' in excluded_classes and any(cls in excluded_classes['*'] for cls in element.get('class', [])):
|
|
return True
|
|
return element.name in excluded_classes and \
|
|
any(cls in excluded_classes[element.name] for cls in element.get('class', []))
|
|
|
|
def _extract_element_content(self, element):
|
|
content = ' '.join(child.strip() for child in element.stripped_strings)
|
|
return f'<{element.name}>{content}</{element.name}>\n'
|
|
|
|
|
|
# Register the processor
|
|
ProcessorRegistry.register("HTML_PROCESSOR", HTMLProcessor) |