import io import os import time import psutil from pydub import AudioSegment import tempfile from common.extensions import minio_client import subprocess from .transcription_processor import TranscriptionProcessor from common.utils.business_event_context import current_event class AudioProcessor(TranscriptionProcessor): def __init__(self, tenant, model_variables, document_version): super().__init__(tenant, model_variables, document_version) self.transcription_client = model_variables['transcription_client'] self.transcription_model = model_variables['transcription_model'] self.ffmpeg_path = 'ffmpeg' self.max_compression_duration = model_variables['max_compression_duration'] self.max_transcription_duration = model_variables['max_transcription_duration'] self.compression_cpu_limit = model_variables.get('compression_cpu_limit', 50) # CPU usage limit in percentage self.compression_process_delay = model_variables.get('compression_process_delay', 0.1) # Delay between processing chunks in seconds self.file_type = document_version.file_type def _get_transcription(self): file_data = minio_client.download_document_file( self.tenant.id, self.document_version.bucket_name, self.document_version.object_name, ) with current_event.create_span("Audio Compression"): compressed_audio = self._compress_audio(file_data) with current_event.create_span("Audio Transcription"): transcription = self._transcribe_audio(compressed_audio) return transcription def _compress_audio(self, audio_data): self._log("Compressing audio") with tempfile.NamedTemporaryFile(delete=False, suffix=f'.{self.document_version.file_type}') as temp_file: temp_file.write(audio_data) temp_file_path = temp_file.name try: self._log("Creating AudioSegment from file") audio_info = AudioSegment.from_file(temp_file_path, format=self.document_version.file_type) self._log("Finished creating AudioSegment from file") total_duration = len(audio_info) self._log(f"Audio duration: {total_duration / 1000} seconds") segment_length = self.max_compression_duration * 1000 # Convert to milliseconds total_chunks = (total_duration + segment_length - 1) // segment_length compressed_segments = AudioSegment.empty() for i in range(total_chunks): self._log(f"Compressing segment {i + 1} of {total_chunks}") start_time = i * segment_length end_time = min((i + 1) * segment_length, total_duration) chunk = AudioSegment.from_file( temp_file_path, format=self.document_version.file_type, start_second=start_time / 1000, duration=(end_time - start_time) / 1000 ) compressed_chunk = self._compress_segment(chunk) compressed_segments += compressed_chunk time.sleep(self.compression_process_delay) # Save compressed audio to MinIO compressed_filename = f"{self.document_version.id}_compressed.mp3" with io.BytesIO() as compressed_buffer: compressed_segments.export(compressed_buffer, format="mp3") compressed_buffer.seek(0) minio_client.upload_document_file( self.tenant.id, self.document_version.doc_id, self.document_version.language, self.document_version.id, compressed_filename, compressed_buffer.read() ) self._log(f"Saved compressed audio to MinIO: {compressed_filename}") return compressed_segments except Exception as e: self._log(f"Error during audio processing: {str(e)}", level='error') raise finally: os.unlink(temp_file_path) # Ensure the temporary file is deleted def _compress_segment(self, audio_segment): with io.BytesIO() as segment_buffer: audio_segment.export(segment_buffer, format="wav") segment_buffer.seek(0) with io.BytesIO() as output_buffer: command = [ 'nice', '-n', '19', 'ffmpeg', '-i', 'pipe:0', '-ar', '16000', '-ac', '1', '-b:a', '32k', '-filter:a', 'loudnorm', '-f', 'mp3', 'pipe:1' ] process = psutil.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate(input=segment_buffer.read()) if process.returncode != 0: self._log(f"FFmpeg error: {stderr.decode()}", level='error') raise Exception("FFmpeg compression failed") output_buffer.write(stdout) output_buffer.seek(0) compressed_segment = AudioSegment.from_mp3(output_buffer) return compressed_segment def _transcribe_audio(self, audio_data): self._log("Starting audio transcription") # audio = AudioSegment.from_file(io.BytesIO(audio_data), format="mp3") audio = audio_data segment_length = self.max_transcription_duration * 1000 # calculate milliseconds transcriptions = [] total_chunks = len(audio) // segment_length + 1 for i, chunk in enumerate(audio[::segment_length]): self._log(f'Processing chunk {i + 1} of {total_chunks}') segment_duration = 0 if i == total_chunks - 1: segment_duration = (len(audio) % segment_length) // 1000 else: segment_duration = self.max_transcription_duration with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio: chunk.export(temp_audio.name, format="mp3") temp_audio.flush() try: file_size = os.path.getsize(temp_audio.name) self._log(f"Temporary audio file size: {file_size} bytes") with open(temp_audio.name, 'rb') as audio_file: file_start = audio_file.read(100) self._log(f"First 100 bytes of audio file: {file_start}") audio_file.seek(0) # Reset file pointer to the beginning self._log("Calling transcription API") transcription = self.model_variables.transcribe( file=audio_file, model=self.transcription_model, language=self.document_version.language, response_format='verbose_json', duration=segment_duration, ) self._log("Transcription API call completed") if transcription: # Handle the transcription result based on its type if isinstance(transcription, str): self._log(f"Transcription result (string): {transcription[:100]}...") transcriptions.append(transcription) elif hasattr(transcription, 'text'): self._log( f"Transcription result (object with 'text' attribute): {transcription.text[:100]}...") transcriptions.append(transcription.text) else: self._log(f"Transcription result (unknown type): {str(transcription)[:100]}...") transcriptions.append(str(transcription)) else: self._log("Warning: Received empty transcription", level='warning') except Exception as e: self._log(f"Error during transcription: {str(e)}", level='error') finally: os.unlink(temp_audio.name) full_transcription = " ".join(filter(None, transcriptions)) if not full_transcription: self._log("Warning: No transcription was generated", level='warning') full_transcription = "No transcription available." # Save transcription to MinIO transcription_filename = f"{self.document_version.id}_transcription.txt" minio_client.upload_document_file( self.tenant.id, self.document_version.doc_id, self.document_version.language, self.document_version.id, transcription_filename, full_transcription.encode('utf-8') ) self._log(f"Saved transcription to MinIO: {transcription_filename}") return full_transcription