import io import os import time import psutil from pydub import AudioSegment import tempfile from common.extensions import minio_client import subprocess from flask import current_app from common.utils.model_utils import get_transcription_model from .processor_registry import ProcessorRegistry from .transcription_processor import TranscriptionBaseProcessor from common.utils.business_event_context import current_event class AudioProcessor(TranscriptionBaseProcessor): def __init__(self, tenant, document_version, catalog, processor): super().__init__(tenant, document_version, catalog, processor) self.transcription_model = get_transcription_model() self.ffmpeg_path = 'ffmpeg' self.max_compression_duration = current_app.config['MAX_COMPRESSION_DURATION'] self.max_transcription_duration = current_app.config['MAX_TRANSCRIPTION_DURATION'] self.compression_cpu_limit = current_app.config['COMPRESSION_CPU_LIMIT'] # CPU usage limit in percentage self.compression_process_delay = current_app.config['COMPRESSION_PROCESS_DELAY'] # Delay between processing chunks in seconds self.file_type = document_version.file_type def _get_transcription(self): file_data = minio_client.download_document_file( self.tenant.id, self.document_version.bucket_name, self.document_version.object_name, ) with current_event.create_span("Audio Compression"): compressed_audio = self._compress_audio(file_data) with current_event.create_span("Audio Transcription"): transcription = self._transcribe_audio(compressed_audio) return transcription def _compress_audio(self, audio_data): with tempfile.NamedTemporaryFile(delete=False, suffix=f'.{self.document_version.file_type}') as temp_file: temp_file.write(audio_data) temp_file_path = temp_file.name try: audio_info = AudioSegment.from_file(temp_file_path, format=self.document_version.file_type) total_duration = len(audio_info) self.log_tuning("_compress_audio", { "Audio Duration (ms)": total_duration, }) segment_length = self.max_compression_duration * 1000 # Convert to milliseconds total_chunks = (total_duration + segment_length - 1) // segment_length compressed_segments = AudioSegment.empty() for i in range(total_chunks): self.log_tuning("_compress_audio", { "Segment Nr": f"{i + 1} of {total_chunks}" }) start_time = i * segment_length end_time = min((i + 1) * segment_length, total_duration) chunk = AudioSegment.from_file( temp_file_path, format=self.document_version.file_type, start_second=start_time / 1000, duration=(end_time - start_time) / 1000 ) compressed_chunk = self._compress_segment(chunk) compressed_segments += compressed_chunk time.sleep(self.compression_process_delay) # Save compressed audio to MinIO compressed_filename = f"{self.document_version.id}_compressed.mp3" with io.BytesIO() as compressed_buffer: compressed_segments.export(compressed_buffer, format="mp3") compressed_buffer.seek(0) minio_client.upload_document_file( self.tenant.id, self.document_version.doc_id, self.document_version.language, self.document_version.id, compressed_filename, compressed_buffer.read() ) self.log_tuning("_compress_audio", { "Compressed audio to MinIO": compressed_filename }) return compressed_segments except Exception as e: self._log(f"Error during audio processing: {str(e)}", level='error') raise finally: os.unlink(temp_file_path) # Ensure the temporary file is deleted def _compress_segment(self, audio_segment): with io.BytesIO() as segment_buffer: audio_segment.export(segment_buffer, format="wav") segment_buffer.seek(0) with io.BytesIO() as output_buffer: command = [ 'nice', '-n', '19', 'ffmpeg', '-i', 'pipe:0', '-ar', '16000', '-ac', '1', '-b:a', '32k', '-filter:a', 'loudnorm', '-f', 'mp3', 'pipe:1' ] process = psutil.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate(input=segment_buffer.read()) if process.returncode != 0: self._log(f"FFmpeg error: {stderr.decode()}", level='error') raise Exception("FFmpeg compression failed") output_buffer.write(stdout) output_buffer.seek(0) compressed_segment = AudioSegment.from_mp3(output_buffer) return compressed_segment def _transcribe_audio(self, audio_data): # audio = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3") audio = audio_data segment_length = self.max_transcription_duration * 1000 # calculate milliseconds transcriptions = [] total_chunks = len(audio) // segment_length + 1 for i, chunk in enumerate(audio[::segment_length]): segment_duration = 0 if i == total_chunks - 1: segment_duration = (len(audio) % segment_length) // 1000 else: segment_duration = self.max_transcription_duration with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio: chunk.export(temp_audio.name, format="mp3") temp_audio.flush() try: file_size = os.path.getsize(temp_audio.name) with open(temp_audio.name, 'rb') as audio_file: transcription = self.transcription_model.transcribe( file=audio_file, language=self.document_version.language, response_format='verbose_json', duration=segment_duration ) if transcription: trans = "" # Handle the transcription result based on its type if isinstance(transcription, str): trans = transcription elif hasattr(transcription, 'text'): trans = transcription.text else: transcriptions.append(str(transcription)) transcriptions.append(trans) self.log_tuning("_transcribe_audio", { "Chunk Nr": f"{i + 1} of {total_chunks}", "Segment Duration": segment_duration, "Transcription": trans, }) else: self._log("Warning: Received empty transcription", level='warning') self.log_tuning("_transcribe_audio", {"ERROR": "No transcription"}) except Exception as e: self._log(f"Error during transcription: {str(e)}", level='error') finally: os.unlink(temp_audio.name) full_transcription = " ".join(filter(None, transcriptions)) if not full_transcription: self._log("Warning: No transcription was generated", level='warning') full_transcription = "No transcription available." # Save transcription to MinIO transcription_filename = f"{self.document_version.id}_transcription.txt" minio_client.upload_document_file( self.tenant.id, self.document_version.doc_id, self.document_version.language, self.document_version.id, transcription_filename, full_transcription.encode('utf-8') ) self.log_tuning(f"Saved transcription to MinIO: {transcription_filename}") return full_transcription # Register the processor ProcessorRegistry.register("AUDIO_PROCESSOR", AudioProcessor)