98 lines
4.1 KiB
Python
98 lines
4.1 KiB
Python
# transcription_processor.py
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
|
from langchain_core.output_parsers import StrOutputParser
|
|
from langchain_core.prompts import ChatPromptTemplate
|
|
from langchain_core.runnables import RunnablePassthrough
|
|
|
|
from common.utils.model_utils import create_language_template, get_embedding_llm
|
|
from .base_processor import BaseProcessor
|
|
from common.utils.business_event_context import current_event
|
|
|
|
|
|
class TranscriptionBaseProcessor(BaseProcessor):
|
|
def __init__(self, tenant, model_variables, document_version, catalog, processor):
|
|
super().__init__(tenant, model_variables, document_version, catalog, processor)
|
|
self.annotation_chunk_size = model_variables.annotation_chunk_length
|
|
self.annotation_chunk_overlap = 0
|
|
|
|
def process(self):
|
|
self._log("Starting Transcription processing")
|
|
try:
|
|
with current_event.create_span("Transcription Generation"):
|
|
transcription = self._get_transcription()
|
|
with current_event.create_span("Markdown Generation"):
|
|
chunks = self._chunk_transcription(transcription)
|
|
markdown_chunks = self._process_chunks(chunks)
|
|
full_markdown = self._combine_markdown_chunks(markdown_chunks)
|
|
self._save_markdown(full_markdown)
|
|
self._log("Finished processing Transcription")
|
|
return full_markdown, self._extract_title_from_markdown(full_markdown)
|
|
except Exception as e:
|
|
self._log(f"Error processing Transcription: {str(e)}", level='error')
|
|
raise
|
|
|
|
def _get_transcription(self):
|
|
# This method should be implemented by child classes
|
|
raise NotImplementedError
|
|
|
|
def _chunk_transcription(self, transcription):
|
|
text_splitter = RecursiveCharacterTextSplitter(
|
|
chunk_size=self.annotation_chunk_size,
|
|
chunk_overlap=self.annotation_chunk_overlap,
|
|
length_function=len,
|
|
separators=["\n\n", "\n", " ", ""]
|
|
)
|
|
return text_splitter.split_text(transcription)
|
|
|
|
def _process_chunks(self, chunks):
|
|
self.log_tuning("_process_chunks", {"Nr of Chunks": len(chunks)})
|
|
llm = get_embedding_llm()
|
|
template = self.model_variables.get_template('transcript')
|
|
language_template = create_language_template(template, self.document_version.language)
|
|
transcript_prompt = ChatPromptTemplate.from_template(language_template)
|
|
setup = RunnablePassthrough()
|
|
output_parser = StrOutputParser()
|
|
|
|
chain = setup | transcript_prompt | llm | output_parser
|
|
|
|
markdown_chunks = []
|
|
previous_part = ""
|
|
for i, chunk in enumerate(chunks):
|
|
input_transcript = {
|
|
'transcript': chunk,
|
|
'previous_part': previous_part
|
|
}
|
|
markdown = chain.invoke(input_transcript)
|
|
markdown = self._clean_markdown(markdown)
|
|
self.log_tuning("_process_chunks", {
|
|
"Chunk Number": f"{i + 1} of {len(chunks)}",
|
|
"Chunk": chunk,
|
|
"Previous Chunk": previous_part,
|
|
"Markdown": markdown,
|
|
})
|
|
markdown_chunks.append(markdown)
|
|
|
|
# Extract the last part for the next iteration
|
|
lines = markdown.split('\n')
|
|
last_header = None
|
|
for line in reversed(lines):
|
|
if line.startswith('#'):
|
|
last_header = line
|
|
break
|
|
if last_header:
|
|
header_index = lines.index(last_header)
|
|
previous_part = '\n'.join(lines[header_index:])
|
|
else:
|
|
previous_part = lines[-1] if lines else ""
|
|
|
|
return markdown_chunks
|
|
|
|
def _combine_markdown_chunks(self, markdown_chunks):
|
|
return "\n\n".join(markdown_chunks)
|
|
|
|
def _extract_title_from_markdown(self, markdown):
|
|
lines = markdown.split('\n')
|
|
for line in lines:
|
|
if line.startswith('# '):
|
|
return line[2:].strip()
|
|
return "Untitled Transcription" |