import re from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough from common.extensions import minio_client from common.utils.model_utils import create_language_template from .processor import Processor class SRTProcessor(Processor): def __init__(self, tenant, model_variables, document_version): super().__init__(tenant, model_variables, document_version) def process(self): self._log("Starting SRT processing") try: file_data = minio_client.download_document_file( self.tenant.id, self.document_version.doc_id, self.document_version.language, self.document_version.id, self.document_version.file_name ) srt_content = file_data.decode('utf-8') cleaned_transcription = self._clean_srt(srt_content) markdown, title = self._generate_markdown_from_transcription(cleaned_transcription) self._save_markdown(markdown) self._log("Finished processing SRT") return markdown, title except Exception as e: self._log(f"Error processing SRT: {str(e)}", level='error') raise def _clean_srt(self, srt_content): # Remove timecodes and subtitle numbers cleaned_lines = [] for line in srt_content.split('\n'): # Skip empty lines, subtitle numbers, and timecodes if line.strip() and not line.strip().isdigit() and not re.match( r'\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3}', line): cleaned_lines.append(line.strip()) # Join the cleaned lines cleaned_text = ' '.join(cleaned_lines) # Remove any extra spaces cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() return cleaned_text def _generate_markdown_from_transcription(self, transcription): self._log("Generating markdown from transcription") llm = self.model_variables['llm'] template = self.model_variables['transcript_template'] language_template = create_language_template(template, self.document_version.language) transcript_prompt = ChatPromptTemplate.from_template(language_template) setup = RunnablePassthrough() output_parser = StrOutputParser() chain = setup | transcript_prompt | llm | output_parser input_transcript = {'transcript': transcription} markdown = chain.invoke(input_transcript) # Extract title from the markdown title = self._extract_title_from_markdown(markdown) return markdown, title def _extract_title_from_markdown(self, markdown): # Simple extraction of the first header as the title lines = markdown.split('\n') for line in lines: if line.startswith('# '): return line[2:].strip() return "Untitled SRT Transcription"