feat: added Whisper stt

pull/6/head
Shiven Mian 11 months ago
parent a3de4c1286
commit f749cb878e

@ -0,0 +1,52 @@
from fastapi import FastAPI, WebSocket
import uvicorn
import json
from stt import stt
import tempfile
app = FastAPI()
@app.websocket("/user")
async def user(ws: WebSocket):
await ws.accept()
audio_file = bytearray()
mime_type = None
try:
while True:
message = await ws.receive()
if message['type'] == 'websocket.disconnect':
break
if message['type'] == 'websocket.receive':
if 'text' in message:
control_message = json.loads(message['text'])
if control_message.get('action') == 'command' and control_message.get('state') == 'start' and 'mimeType' in control_message:
# This indicates the start of a new audio file
mime_type = control_message.get('mimeType')
elif control_message.get('action') == 'command' and control_message.get('state') == 'end':
# This indicates the end of the audio file
# Process the complete audio file here
transcription = stt(audio_file, mime_type)
await ws.send_json({"transcript": transcription})
print("SENT TRANSCRIPTION!")
# Reset the bytearray for the next audio file
audio_file = bytearray()
mime_type = None
elif 'bytes' in message:
# If it's not a control message, it's part of the audio file
audio_file.extend(message['bytes'])
except Exception as e:
print(f"WebSocket connection closed with exception: {e}")
finally:
await ws.close()
print("WebSocket connection closed")
if __name__ == "__main__":
with tempfile.TemporaryDirectory():
uvicorn.run(app, host="0.0.0.0", port=8000)

@ -0,0 +1,52 @@
from datetime import datetime
import os
import contextlib
import tempfile
import ffmpeg
import subprocess
from openai import OpenAI
client = OpenAI()
def convert_mime_type_to_format(mime_type: str) -> str:
if mime_type == "audio/x-wav" or mime_type == "audio/wav":
return "wav"
if mime_type == "audio/webm":
return "webm"
return mime_type
@contextlib.contextmanager
def export_audio_to_wav_ffmpeg(audio: bytearray, mime_type: str) -> str:
temp_dir = tempfile.gettempdir()
# Create a temporary file with the appropriate extension
input_ext = convert_mime_type_to_format(mime_type)
input_path = os.path.join(temp_dir, f"input_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.{input_ext}")
with open(input_path, 'wb') as f:
f.write(audio)
# Export to wav
output_path = os.path.join(temp_dir, f"output_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav")
ffmpeg.input(input_path).output(output_path, acodec='pcm_s16le', ac=1, ar='16k').run()
print(f"Temporary file path: {output_path}")
try:
yield output_path
finally:
os.remove(input_path)
os.remove(output_path)
def stt(audio_bytes: bytearray, mime_type):
with export_audio_to_wav_ffmpeg(audio_bytes, mime_type) as wav_file_path:
audio_file = open(wav_file_path, "rb")
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="text"
)
print("Exciting transcription result:", transcript)
return transcript

@ -1,4 +1,7 @@
git+https://github.com/KillianLucas/open-interpreter.git
asyncio
pyaudio
pynput
redis
fastapi
uvicorn

@ -0,0 +1,141 @@
"""
Handles everything the user interacts through.
Connects to a websocket at /user. Sends shit to it, and displays/plays the shit it sends back.
For now, just handles a spacebar being pressed for the duration it's pressed,
it should record audio.
SIMPLEST POSSIBLE: Sends that audio to OpenAI whisper, gets the transcript,
sends it to /user in LMC format (role: user, etc)
MOST FUTUREPROOF: Streams chunks of audio to /user, which will then handle stt in stt.py.
"""
import os
import pyaudio
import threading
import asyncio
import websockets
import json
from pynput import keyboard
import wave
import tempfile
from datetime import datetime
# Configuration
chunk = 1024 # Record in chunks of 1024 samples
sample_format = pyaudio.paInt16 # 16 bits per sample
channels = 1 # Stereo
fs = 48000 # Sample rate
p = pyaudio.PyAudio() # Create an interface to PortAudio
frames = [] # Initialize array to store frames
recording = False # Flag to control recording state
ws_chunk_size = 4096 # Websocket stream chunk size
async def start_recording():
global recording
if recording:
return # Avoid multiple starts
recording = True
frames.clear() # Clear existing frames
stream = p.open(format=sample_format,
channels=channels,
rate=fs,
frames_per_buffer=chunk,
input=True)
print("Recording started...")
async with websockets.connect("ws://localhost:8000/user") as websocket:
# Send the start command with mime type
await websocket.send(json.dumps({"action": "command", "state": "start", "mimeType": "audio/wav"}))
while recording:
data = stream.read(chunk)
frames.append(data)
stream.stop_stream()
stream.close()
try:
file_path = save_recording(frames)
with open(file_path, 'rb') as audio_file:
byte_chunk = audio_file.read(ws_chunk_size)
while byte_chunk:
await websocket.send(byte_chunk)
byte_chunk = audio_file.read(ws_chunk_size)
finally:
os.remove(file_path)
# Send the end command
await websocket.send(json.dumps({"action": "command", "state": "end"}))
# Receive a json message and then close the connection
message = await websocket.recv()
print("Received message:", json.loads(message))
print("Recording stopped.")
def save_recording(frames) -> str:
# Save the recorded data as a WAV file
temp_dir = tempfile.gettempdir()
# Create a temporary file with the appropriate extension
output_path = os.path.join(temp_dir, f"input_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav")
with wave.open(output_path, 'wb') as wf:
wf.setnchannels(channels)
wf.setsampwidth(p.get_sample_size(sample_format))
wf.setframerate(fs)
wf.writeframes(b''.join(frames))
return output_path
def start_recording_sync():
# Create a new event loop for the thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Run the asyncio event loop
loop.run_until_complete(start_recording())
loop.close()
def stop_recording():
global recording
recording = False
print("Stopped recording")
def toggle_recording():
global recording
if recording:
stop_recording()
else:
# Start recording in a new thread to avoid blocking
print("Starting recording")
threading.Thread(target=start_recording_sync).start()
is_space_pressed = False # Flag to track the state of the spacebar
def on_press(key):
global is_space_pressed
if key == keyboard.Key.space and not is_space_pressed:
is_space_pressed = True
toggle_recording()
def on_release(key):
global is_space_pressed
if key == keyboard.Key.space and is_space_pressed:
is_space_pressed = False
stop_recording()
if key == keyboard.Key.esc:
# Stop listener
return False
# Collect events until released
with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
with tempfile.TemporaryDirectory():
print("Press the spacebar to start/stop recording. Press ESC to exit.")
listener.join()
p.terminate()

@ -1,13 +0,0 @@
"""
Handles everything the user interacts through.
Connects to a websocket at /user. Sends shit to it, and displays/plays the shit it sends back.
For now, just handles a spacebar being pressed for the duration it's pressed,
it should record audio.
SIMPLEST POSSIBLE: Sends that audio to OpenAI whisper, gets the transcript,
sends it to /user in LMC format (role: user, etc)
MOST FUTUREPROOF: Streams chunks of audio to /user, which will then handle stt in stt.py.
"""
Loading…
Cancel
Save