- Introduction of API-functionality (to be continued). Deduplication of document and url uploads between views and api. - Improvements on document processing - introduction of processor classes to streamline document inputs - Removed pure Youtube functionality, as Youtube retrieval of documents continuously changes. But added upload of srt, mp3, ogg and mp4
81 lines
3.0 KiB
Python
81 lines
3.0 KiB
Python
import re
|
|
from langchain_core.output_parsers import StrOutputParser
|
|
from langchain_core.prompts import ChatPromptTemplate
|
|
from langchain_core.runnables import RunnablePassthrough
|
|
from common.extensions import minio_client
|
|
from common.utils.model_utils import create_language_template
|
|
from .processor import Processor
|
|
|
|
|
|
class SRTProcessor(Processor):
|
|
def __init__(self, tenant, model_variables, document_version):
|
|
super().__init__(tenant, model_variables, document_version)
|
|
|
|
def process(self):
|
|
self._log("Starting SRT processing")
|
|
try:
|
|
file_data = minio_client.download_document_file(
|
|
self.tenant.id,
|
|
self.document_version.doc_id,
|
|
self.document_version.language,
|
|
self.document_version.id,
|
|
self.document_version.file_name
|
|
)
|
|
|
|
srt_content = file_data.decode('utf-8')
|
|
cleaned_transcription = self._clean_srt(srt_content)
|
|
markdown, title = self._generate_markdown_from_transcription(cleaned_transcription)
|
|
|
|
self._save_markdown(markdown)
|
|
self._log("Finished processing SRT")
|
|
return markdown, title
|
|
except Exception as e:
|
|
self._log(f"Error processing SRT: {str(e)}", level='error')
|
|
raise
|
|
|
|
def _clean_srt(self, srt_content):
|
|
# Remove timecodes and subtitle numbers
|
|
cleaned_lines = []
|
|
for line in srt_content.split('\n'):
|
|
# Skip empty lines, subtitle numbers, and timecodes
|
|
if line.strip() and not line.strip().isdigit() and not re.match(
|
|
r'\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3}', line):
|
|
cleaned_lines.append(line.strip())
|
|
|
|
# Join the cleaned lines
|
|
cleaned_text = ' '.join(cleaned_lines)
|
|
|
|
# Remove any extra spaces
|
|
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
|
|
|
|
return cleaned_text
|
|
|
|
def _generate_markdown_from_transcription(self, transcription):
|
|
self._log("Generating markdown from transcription")
|
|
llm = self.model_variables['llm']
|
|
template = self.model_variables['transcript_template']
|
|
language_template = create_language_template(template, self.document_version.language)
|
|
transcript_prompt = ChatPromptTemplate.from_template(language_template)
|
|
setup = RunnablePassthrough()
|
|
output_parser = StrOutputParser()
|
|
|
|
chain = setup | transcript_prompt | llm | output_parser
|
|
|
|
input_transcript = {'transcript': transcription}
|
|
markdown = chain.invoke(input_transcript)
|
|
|
|
# Extract title from the markdown
|
|
title = self._extract_title_from_markdown(markdown)
|
|
|
|
return markdown, title
|
|
|
|
def _extract_title_from_markdown(self, markdown):
|
|
# Simple extraction of the first header as the title
|
|
lines = markdown.split('\n')
|
|
for line in lines:
|
|
if line.startswith('# '):
|
|
return line[2:].strip()
|
|
return "Untitled SRT Transcription"
|
|
|
|
|