from bs4 import BeautifulSoup from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough from common.extensions import db, minio_client from common.utils.model_utils import create_language_template from .processor import Processor class HTMLProcessor(Processor): def __init__(self, tenant, model_variables, document_version): super().__init__(tenant, model_variables, document_version) self.html_tags = model_variables['html_tags'] self.html_end_tags = model_variables['html_end_tags'] self.html_included_elements = model_variables['html_included_elements'] self.html_excluded_elements = model_variables['html_excluded_elements'] def process(self): self._log("Starting HTML processing") try: file_data = minio_client.download_document_file( self.tenant.id, self.document_version.doc_id, self.document_version.language, self.document_version.id, self.document_version.file_name ) html_content = file_data.decode('utf-8') extracted_html, title = self._parse_html(html_content) markdown = self._generate_markdown_from_html(extracted_html) self._save_markdown(markdown) self._log("Finished processing HTML") return markdown, title except Exception as e: self._log(f"Error processing HTML: {str(e)}", level='error') raise def _parse_html(self, html_content): self._log(f'Parsing HTML for tenant {self.tenant.id}') soup = BeautifulSoup(html_content, 'html.parser') extracted_html = '' excluded_classes = self._parse_excluded_classes(self.tenant.html_excluded_classes) if self.html_included_elements: elements_to_parse = soup.find_all(self.html_included_elements) else: elements_to_parse = [soup] for element in elements_to_parse: for sub_element in element.find_all(self.html_tags): if self._should_exclude_element(sub_element, excluded_classes): continue extracted_html += self._extract_element_content(sub_element) title = soup.find('title').get_text(strip=True) if soup.find('title') else '' self._log(f'Finished parsing HTML for tenant {self.tenant.id}') return extracted_html, title def _generate_markdown_from_html(self, html_content): self._log(f'Generating markdown from HTML for tenant {self.tenant.id}') llm = self.model_variables['llm'] template = self.model_variables['html_parse_template'] parse_prompt = ChatPromptTemplate.from_template(template) setup = RunnablePassthrough() output_parser = StrOutputParser() chain = setup | parse_prompt | llm | output_parser soup = BeautifulSoup(html_content, 'lxml') chunks = self._split_content(soup) markdown_chunks = [] for chunk in chunks: if self.embed_tuning: self._log(f'Processing chunk: \n{chunk}\n') input_html = {"html": chunk} markdown_chunk = chain.invoke(input_html) markdown_chunks.append(markdown_chunk) if self.embed_tuning: self._log(f'Processed markdown chunk: \n{markdown_chunk}\n') markdown = "\n\n".join(markdown_chunks) self._log(f'Finished generating markdown from HTML for tenant {self.tenant.id}') return markdown def _split_content(self, soup, max_size=20000): chunks = [] current_chunk = [] current_size = 0 for element in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'div', 'span', 'table']): element_html = str(element) element_size = len(element_html) if current_size + element_size > max_size and current_chunk: chunks.append(''.join(map(str, current_chunk))) current_chunk = [] current_size = 0 current_chunk.append(element) current_size += element_size if element.name in ['h1', 'h2', 'h3'] and current_size > max_size: chunks.append(''.join(map(str, current_chunk))) current_chunk = [] current_size = 0 if current_chunk: chunks.append(''.join(map(str, current_chunk))) return chunks def _parse_excluded_classes(self, excluded_classes): parsed = {} for rule in excluded_classes: element, cls = rule.split('.', 1) parsed.setdefault(element, set()).add(cls) return parsed def _should_exclude_element(self, element, excluded_classes): if self.html_excluded_elements and element.find_parent(self.html_excluded_elements): return True return self._is_element_excluded_by_class(element, excluded_classes) def _is_element_excluded_by_class(self, element, excluded_classes): for parent in element.parents: if self._element_matches_exclusion(parent, excluded_classes): return True return self._element_matches_exclusion(element, excluded_classes) def _element_matches_exclusion(self, element, excluded_classes): if '*' in excluded_classes and any(cls in excluded_classes['*'] for cls in element.get('class', [])): return True return element.name in excluded_classes and \ any(cls in excluded_classes[element.name] for cls in element.get('class', [])) def _extract_element_content(self, element): content = ' '.join(child.strip() for child in element.stripped_strings) return f'<{element.name}>{content}\n'