diff --git a/OS/01/assistant/assistant.py b/OS/01/assistant/assistant.py index 64e31cb..5f56f79 100644 --- a/OS/01/assistant/assistant.py +++ b/OS/01/assistant/assistant.py @@ -6,7 +6,7 @@ Exposes a ws endpoint called /user. Things from there go into the queue. We also In a while loop we watch the queue and handle it. """ -import os +from starlette.websockets import WebSocketDisconnect import ast import json import time @@ -21,12 +21,12 @@ from starlette.websockets import WebSocket from create_interpreter import create_interpreter from stt import stt from tts import tts +from pathlib import Path # Create interpreter interpreter = create_interpreter() -script_dir = os.path.dirname(os.path.abspath(__file__)) -conversation_history_path = os.path.join(script_dir, 'conversations', 'user.json') +conversation_history_path = Path(__file__).parent / 'conversations' / 'user.json' # Create Queue objects to_user = queue.Queue() @@ -49,11 +49,16 @@ async def read_computer(item: dict): async def websocket_endpoint(websocket: WebSocket): await websocket.accept() while True: - data = await websocket.receive_json() - to_assistant.put(data) - while not to_user.empty(): - message = to_user.get() - await websocket.send_json(message) + try: + data = await websocket.receive_json() + to_assistant.put(data) + while not to_user.empty(): + message = to_user.get() + print("sending a message!") + await websocket.send_json(message) + except WebSocketDisconnect: + pass + def queue_listener(): audio_file = bytearray() @@ -89,25 +94,32 @@ def queue_listener(): accumulated_text = "" - for chunk in interpreter.chat(messages): + for chunk in interpreter.chat(messages, stream=True): # Send it to the user to_user.put(chunk) # Speak full sentences out loud - if chunk["type"] == "assistant": + if chunk["role"] == "assistant" and "content" in chunk: + print("Chunk role is assistant and content is present in chunk.") accumulated_text += chunk["content"] + print("Accumulated text: ", accumulated_text) sentences = split_into_sentences(accumulated_text) + print("Sentences after splitting: ", sentences) if is_full_sentence(sentences[-1]): + print("Last sentence is a full sentence.") for sentence in sentences: - for audio_chunk in tts(sentence): - to_user.put(audio_chunk) + print("Streaming sentence: ", sentence) + stream_tts_to_user(sentence) accumulated_text = "" + print("Reset accumulated text.") else: + print("Last sentence is not a full sentence.") for sentence in sentences[:-1]: - for audio_chunk in tts(sentence): - to_user.put(audio_chunk) + print("Streaming sentence: ", sentence) + stream_tts_to_user(sentence) accumulated_text = sentences[-1] + print("Accumulated text is now the last sentence: ", accumulated_text) # If we have a new message, save our progress and go back to the top if not to_assistant.empty(): @@ -115,6 +127,12 @@ def queue_listener(): json.dump(interpreter.messages, file) break +def stream_tts_to_user(sentence): + to_user.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "start": True}) + audio_bytes = tts(sentence) + to_user.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "content": str(audio_bytes)}) + to_user.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "end": True}) + # Create a thread for the queue listener queue_thread = Thread(target=queue_listener) diff --git a/OS/01/assistant/stt.py b/OS/01/assistant/stt.py index 07153bc..7afe1ed 100644 --- a/OS/01/assistant/stt.py +++ b/OS/01/assistant/stt.py @@ -38,8 +38,6 @@ def export_audio_to_wav_ffmpeg(audio: bytearray, mime_type: str) -> str: output_path = os.path.join(temp_dir, f"output_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav") ffmpeg.input(input_path).output(output_path, acodec='pcm_s16le', ac=1, ar='16k').run() - print(f"Temporary file path: {output_path}") - try: yield output_path finally: diff --git a/OS/01/assistant/tts.py b/OS/01/assistant/tts.py index e5254da..d752295 100644 --- a/OS/01/assistant/tts.py +++ b/OS/01/assistant/tts.py @@ -2,41 +2,18 @@ Defines a function which takes text and returns a path to an audio file. """ -from openai import OpenAI -import pydub -import pydub.playback import tempfile -import os -from datetime import datetime -from io import BytesIO +from openai import OpenAI client = OpenAI() -chunk_size = 1024 -read_chunk_size = 4096 def tts(text): - - temp_dir = tempfile.gettempdir() - output_path = os.path.join(temp_dir, f"output_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.mp3") - - try: - with ( - client.with_streaming_response.audio.speech.create( - model="tts-1", - voice="alloy", - input=text, - response_format='mp3', - speed=1.2) - ) as response: - with open(output_path, 'wb') as f: - for chunk in response.iter_bytes(chunk_size): - f.write(chunk) - - with open(output_path, 'rb') as f: - byte_chunk = f.read(read_chunk_size) - yield byte_chunk - - seg = pydub.AudioSegment.from_mp3(output_path) - pydub.playback.play(seg) - finally: - os.remove(output_path) \ No newline at end of file + response = client.audio.speech.create( + model="tts-1", + voice="alloy", + input=text, + response_format="mp3" + ) + with tempfile.NamedTemporaryFile() as temp_file: + response.stream_to_file(temp_file.name) + return temp_file.read() diff --git a/OS/01/requirements.txt b/OS/01/requirements.txt index 7022549..0c4cf75 100644 --- a/OS/01/requirements.txt +++ b/OS/01/requirements.txt @@ -7,4 +7,5 @@ uvicorn websockets python-dotenv ffmpeg-python -textual \ No newline at end of file +textual +pydub \ No newline at end of file diff --git a/OS/01/user/user.py b/OS/01/user/user.py index f939fc4..127e1f8 100644 --- a/OS/01/user/user.py +++ b/OS/01/user/user.py @@ -6,6 +6,8 @@ import pyaudio from queue import Queue from pynput import keyboard import json +import pydub +import ast # Configuration for Audio Recording CHUNK = 1024 # Record in chunks of 1024 samples @@ -77,22 +79,50 @@ def toggle_recording(state): async def websocket_communication(): """Handle WebSocket communication and listen for incoming messages.""" - async with websockets.connect(WS_URL) as websocket: - while True: - # Send data from the queue to the server - while not data_queue.empty(): - data = data_queue.get_nowait() - await websocket.send(json.dumps(data)) + while True: + try: + async with websockets.connect(WS_URL) as websocket: + + print("Press the spacebar to start/stop recording. Press ESC to exit.") + + while True: + # Send data from the queue to the server + while not data_queue.empty(): + data = data_queue.get_nowait() + print(f"Sending data to the server: {data}") + await websocket.send(json.dumps(data)) + + # Listen for incoming messages from the server + try: + chunk = await websocket.recv() + print(f"Received from server: {str(chunk)[:100]}") + + if chunk["type"] == "audio": + print("Received audio data from server.") + if "start" in chunk: + print("Start of audio data received.") + audio_chunks = bytearray() + if "content" in chunk: + print("Audio content received.") + audio_chunks.extend(bytes(ast.literal_eval(chunk["content"]))) + if "end" in chunk: + print("End of audio data received.") + with tempfile.NamedTemporaryFile(suffix=".mp3") as f: + f.write(audio_chunks) + f.seek(0) + seg = pydub.AudioSegment.from_mp3(f.name) + print("Playing received audio.") + pydub.playback.play(seg) + + except Exception as e: + print(f"Error receiving data: {e}") + + print("Sleeping for 0.05 seconds.") + await asyncio.sleep(0.05) + except Exception as e: + print(f"Websocket not ready, retrying... ({e})") + await asyncio.sleep(1) - # Listen for incoming messages from the server - try: - incoming_message = await asyncio.wait_for(websocket.recv(), timeout=1.0) - print(f"Received from server: {incoming_message}") - except asyncio.TimeoutError: - # No message received within timeout period - pass - - await asyncio.sleep(0.1) def on_press(key): @@ -101,9 +131,12 @@ def on_press(key): toggle_recording(True) def on_release(key): - """Detect spacebar release.""" + """Detect spacebar release and ESC key press.""" if key == keyboard.Key.space: toggle_recording(False) + elif key == keyboard.Key.esc: + print("Exiting...") + os._exit(0) def main(): # Start the WebSocket communication in a separate asyncio event loop @@ -112,7 +145,7 @@ def main(): # Keyboard listener for spacebar press/release with keyboard.Listener(on_press=on_press, on_release=on_release) as listener: - print("Press the spacebar to start/stop recording. Press ESC to exit.") + print("In a moment, press the spacebar to start/stop recording. Press ESC to exit.") listener.join() p.terminate()