Files
eveAI/eveai_workers/processors/transcription_processor.py
2024-12-05 15:19:37 +01:00

98 lines
4.0 KiB
Python

# transcription_processor.py
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from common.utils.model_utils import create_language_template
from .base_processor import BaseProcessor
from common.utils.business_event_context import current_event
class TranscriptionBaseProcessor(BaseProcessor):
def __init__(self, tenant, model_variables, document_version, catalog, processor):
super().__init__(tenant, model_variables, document_version, catalog, processor)
self.annotation_chunk_size = model_variables.annotation_chunk_length
self.annotation_chunk_overlap = 0
def process(self):
self._log("Starting Transcription processing")
try:
with current_event.create_span("Transcription Generation"):
transcription = self._get_transcription()
with current_event.create_span("Markdown Generation"):
chunks = self._chunk_transcription(transcription)
markdown_chunks = self._process_chunks(chunks)
full_markdown = self._combine_markdown_chunks(markdown_chunks)
self._save_markdown(full_markdown)
self._log("Finished processing Transcription")
return full_markdown, self._extract_title_from_markdown(full_markdown)
except Exception as e:
self._log(f"Error processing Transcription: {str(e)}", level='error')
raise
def _get_transcription(self):
# This method should be implemented by child classes
raise NotImplementedError
def _chunk_transcription(self, transcription):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.annotation_chunk_size,
chunk_overlap=self.annotation_chunk_overlap,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
return text_splitter.split_text(transcription)
def _process_chunks(self, chunks):
self.log_tuning("_process_chunks", {"Nr of Chunks": len(chunks)})
llm = self.model_variables.get_llm()
template = self.model_variables.get_template('transcript')
language_template = create_language_template(template, self.document_version.language)
transcript_prompt = ChatPromptTemplate.from_template(language_template)
setup = RunnablePassthrough()
output_parser = StrOutputParser()
chain = setup | transcript_prompt | llm | output_parser
markdown_chunks = []
previous_part = ""
for i, chunk in enumerate(chunks):
input_transcript = {
'transcript': chunk,
'previous_part': previous_part
}
markdown = chain.invoke(input_transcript)
markdown = self._clean_markdown(markdown)
self.log_tuning("_process_chunks", {
"Chunk Number": f"{i + 1} of {len(chunks)}",
"Chunk": chunk,
"Previous Chunk": previous_part,
"Markdown": markdown,
})
markdown_chunks.append(markdown)
# Extract the last part for the next iteration
lines = markdown.split('\n')
last_header = None
for line in reversed(lines):
if line.startswith('#'):
last_header = line
break
if last_header:
header_index = lines.index(last_header)
previous_part = '\n'.join(lines[header_index:])
else:
previous_part = lines[-1] if lines else ""
return markdown_chunks
def _combine_markdown_chunks(self, markdown_chunks):
return "\n\n".join(markdown_chunks)
def _extract_title_from_markdown(self, markdown):
lines = markdown.split('\n')
for line in lines:
if line.startswith('# '):
return line[2:].strip()
return "Untitled Transcription"