|
|
|
@ -12,6 +12,8 @@ import subprocess
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
import subprocess
|
|
|
|
|
import platform
|
|
|
|
|
import urllib.request
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Stt:
|
|
|
|
@ -23,7 +25,6 @@ class Stt:
|
|
|
|
|
return stt(self.service_directory, audio_file_path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def install(service_dir):
|
|
|
|
|
|
|
|
|
|
### INSTALL
|
|
|
|
@ -42,27 +43,29 @@ def install(service_dir):
|
|
|
|
|
# Check if whisper-rust executable exists before attempting to build
|
|
|
|
|
if not os.path.isfile(os.path.join(WHISPER_RUST_PATH, "target/release/whisper-rust")):
|
|
|
|
|
# Check if Rust is installed. Needed to build whisper executable
|
|
|
|
|
rust_check = subprocess.call('command -v rustc', shell=True)
|
|
|
|
|
if rust_check != 0:
|
|
|
|
|
rustc_path = shutil.which('rustc')
|
|
|
|
|
if rustc_path is None:
|
|
|
|
|
print("Rust is not installed or is not in system PATH. Please install Rust before proceeding.")
|
|
|
|
|
exit(1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Build Whisper Rust executable if not found
|
|
|
|
|
subprocess.call('cargo build --release', shell=True)
|
|
|
|
|
subprocess.run(['cargo', 'build', '--release'], check=True)
|
|
|
|
|
else:
|
|
|
|
|
print("Whisper Rust executable already exists. Skipping build.")
|
|
|
|
|
|
|
|
|
|
WHISPER_MODEL_PATH = os.path.join(service_dir, "model")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
WHISPER_MODEL_NAME = os.getenv('WHISPER_MODEL_NAME', 'ggml-tiny.en.bin')
|
|
|
|
|
WHISPER_MODEL_URL = os.getenv('WHISPER_MODEL_URL', 'https://huggingface.co/ggerganov/whisper.cpp/resolve/main/')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not os.path.isfile(os.path.join(WHISPER_MODEL_PATH, WHISPER_MODEL_NAME)):
|
|
|
|
|
os.makedirs(WHISPER_MODEL_PATH, exist_ok=True)
|
|
|
|
|
subprocess.call(f'curl -L "{WHISPER_MODEL_URL}{WHISPER_MODEL_NAME}" -o "{os.path.join(WHISPER_MODEL_PATH, WHISPER_MODEL_NAME)}"', shell=True)
|
|
|
|
|
urllib.request.urlretrieve(f"{WHISPER_MODEL_URL}{WHISPER_MODEL_NAME}",
|
|
|
|
|
os.path.join(WHISPER_MODEL_PATH, WHISPER_MODEL_NAME))
|
|
|
|
|
else:
|
|
|
|
|
print("Whisper model already exists. Skipping download.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_mime_type_to_format(mime_type: str) -> str:
|
|
|
|
|
if mime_type == "audio/x-wav" or mime_type == "audio/wav":
|
|
|
|
|
return "wav"
|
|
|
|
@ -73,6 +76,7 @@ def convert_mime_type_to_format(mime_type: str) -> str:
|
|
|
|
|
|
|
|
|
|
return mime_type
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@contextlib.contextmanager
|
|
|
|
|
def export_audio_to_wav_ffmpeg(audio: bytearray, mime_type: str) -> str:
|
|
|
|
|
temp_dir = tempfile.gettempdir()
|
|
|
|
@ -105,10 +109,12 @@ def export_audio_to_wav_ffmpeg(audio: bytearray, mime_type: str) -> str:
|
|
|
|
|
os.remove(input_path)
|
|
|
|
|
os.remove(output_path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_command(command):
|
|
|
|
|
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
|
|
|
return result.stdout, result.stderr
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_transcription_file(service_directory, wav_file_path: str):
|
|
|
|
|
local_path = os.path.join(service_directory, 'model')
|
|
|
|
|
whisper_rust_path = os.path.join(service_directory, 'whisper-rust', 'target', 'release')
|
|
|
|
@ -124,14 +130,15 @@ def get_transcription_file(service_directory, wav_file_path: str):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def stt_wav(service_directory, wav_file_path: str):
|
|
|
|
|
temp_dir = tempfile.gettempdir()
|
|
|
|
|
output_path = os.path.join(temp_dir, f"output_stt_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav")
|
|
|
|
|
ffmpeg.input(wav_file_path).output(output_path, acodec='pcm_s16le', ac=1, ar='16k').run()
|
|
|
|
|
try:
|
|
|
|
|
transcript = get_transcription_file(service_directory, output_path)
|
|
|
|
|
finally:
|
|
|
|
|
os.remove(output_path)
|
|
|
|
|
return transcript
|
|
|
|
|
temp_dir = tempfile.gettempdir()
|
|
|
|
|
output_path = os.path.join(temp_dir, f"output_stt_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav")
|
|
|
|
|
ffmpeg.input(wav_file_path).output(output_path, acodec='pcm_s16le', ac=1, ar='16k').run()
|
|
|
|
|
try:
|
|
|
|
|
transcript = get_transcription_file(service_directory, output_path)
|
|
|
|
|
finally:
|
|
|
|
|
os.remove(output_path)
|
|
|
|
|
return transcript
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def stt(service_directory, input_data):
|
|
|
|
|
return stt_wav(service_directory, input_data)
|
|
|
|
|
return stt_wav(service_directory, input_data)
|
|
|
|
|