diff --git a/01OS/01OS/clients/base_device.py b/01OS/01OS/clients/base_device.py index 678f58b..60f3cf3 100644 --- a/01OS/01OS/clients/base_device.py +++ b/01OS/01OS/clients/base_device.py @@ -25,6 +25,7 @@ from datetime import datetime import cv2 import base64 from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run? +# In the future, I guess kernel watching code should be elsewhere? Somewhere server / client agnostic? from ..server.utils.kernel import put_kernel_messages_into_queue from ..server.utils.get_system_info import get_system_info from ..server.stt.stt import stt_wav @@ -33,6 +34,11 @@ from ..server.utils.logs import setup_logging from ..server.utils.logs import logger setup_logging() + +from ..utils.accumulator import Accumulator + +accumulator = Accumulator() + # Configuration for Audio Recording CHUNK = 1024 # Record in chunks of 1024 samples FORMAT = pyaudio.paInt16 # 16 bits per sample @@ -52,14 +58,13 @@ current_platform = get_system_info() # Initialize PyAudio p = pyaudio.PyAudio() -import asyncio - send_queue = queue.Queue() class Device: def __init__(self): self.pressed_keys = set() self.captured_images = [] + self.audiosegments = [] def fetch_image_from_camera(self, camera_index=CAMERA_DEVICE_INDEX): """Captures an image from the specified camera device and saves it to a temporary file. Adds the image to the captured_images list.""" @@ -114,12 +119,24 @@ class Device: self.add_image_to_send_queue(image_path) self.captured_images.clear() # Clear the list after sending + + async def play_audiosegments(self): + """Plays them sequentially.""" + while True: + try: + for audio in self.audiosegments: + play(audio) + self.audiosegments.remove(audio) + await asyncio.sleep(0.1) + except: + traceback.print_exc() + def record_audio(self): if os.getenv('STT_RUNNER') == "server": # STT will happen on the server. we're sending audio. - send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "start": True}) + send_queue.put({"role": "user", "type": "audio", "format": "bytes.wav", "start": True}) elif os.getenv('STT_RUNNER') == "client": # STT will happen here, on the client. we're sending text. send_queue.put({"role": "user", "type": "message", "start": True}) @@ -155,8 +172,8 @@ class Device: send_queue.put({"role": "user", "type": "message", "content": "stop"}) send_queue.put({"role": "user", "type": "message", "end": True}) else: - send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "content": ""}) - send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "end": True}) + send_queue.put({"role": "user", "type": "audio", "format": "bytes.wav", "content": ""}) + send_queue.put({"role": "user", "type": "audio", "format": "bytes.wav", "end": True}) else: self.queue_all_captured_images() @@ -170,9 +187,9 @@ class Device: with open(wav_path, 'rb') as audio_file: byte_data = audio_file.read(CHUNK) while byte_data: - send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "content": str(byte_data)}) + send_queue.put(byte_data) byte_data = audio_file.read(CHUNK) - send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "end": True}) + send_queue.put({"role": "user", "type": "audio", "format": "bytes.wav", "end": True}) if os.path.exists(wav_path): os.remove(wav_path) @@ -215,8 +232,12 @@ class Device: async def message_sender(self, websocket): while True: message = await asyncio.get_event_loop().run_in_executor(None, send_queue.get) - await websocket.send(json.dumps(message)) + if isinstance(message, bytes): + await websocket.send(message) + else: + await websocket.send(json.dumps(message)) send_queue.task_done() + await asyncio.sleep(0.01) async def websocket_communication(self, WS_URL): while True: @@ -229,52 +250,42 @@ class Device: asyncio.create_task(self.message_sender(websocket)) - initial_message = {"role": None, "type": None, "format": None, "content": None} - message_so_far = initial_message - while True: - message = await websocket.recv() + await asyncio.sleep(0.01) + chunk = await websocket.recv() - logger.debug(f"Got this message from the server: {type(message)} {message}") + logger.debug(f"Got this message from the server: {type(chunk)} {chunk}") - if type(message) == str: - message = json.loads(message) + if type(chunk) == str: + chunk = json.loads(chunk) - if message.get("end"): - logger.debug(f"Complete message from the server: {message_so_far}") - logger.info("\n") - message_so_far = initial_message + message = accumulator.accumulate(chunk) + if message == None: + # Will be None until we have a full message ready + continue - if "content" in message: - print(message['content'], end="", flush=True) - if any(message_so_far[key] != message[key] for key in message_so_far if key != "content"): - message_so_far = message - else: - message_so_far["content"] += message["content"] + # At this point, we have our message - if message["type"] == "audio" and "content" in message: - audio_bytes = bytes(ast.literal_eval(message["content"])) + if message["type"] == "audio" and message["format"].startswith("bytes"): # Convert bytes to audio file - audio_file = io.BytesIO(audio_bytes) - audio = AudioSegment.from_mp3(audio_file) - - # Play the audio - play(audio) + # Format will be bytes.wav or bytes.opus + audio_bytes = io.BytesIO(message["content"]) + audio = AudioSegment.from_file(audio_bytes, codec=message["format"].split(".")[1]) - await asyncio.sleep(1) + self.audiosegments.append(audio) # Run the code if that's the client's job if os.getenv('CODE_RUNNER') == "client": if message["type"] == "code" and "end" in message: - language = message_so_far["format"] - code = message_so_far["content"] + language = message["format"] + code = message["content"] result = interpreter.computer.run(language, code) send_queue.put(result) except: - # traceback.print_exc() + traceback.print_exc() logger.info(f"Connecting to `{WS_URL}`...") await asyncio.sleep(2) @@ -291,6 +302,7 @@ class Device: if os.getenv('CODE_RUNNER') == "client": asyncio.create_task(put_kernel_messages_into_queue(send_queue)) + asyncio.create_task(self.play_audiosegments()) # If Raspberry Pi, add the button listener, otherwise use the spacebar if current_platform.startswith("raspberry-pi"): diff --git a/01OS/01OS/server/server.py b/01OS/01OS/server/server.py index 62ab6d8..9710fd5 100644 --- a/01OS/01OS/server/server.py +++ b/01OS/01OS/server/server.py @@ -9,9 +9,9 @@ import traceback import re from fastapi import FastAPI from fastapi.responses import PlainTextResponse -from starlette.websockets import WebSocket +from starlette.websockets import WebSocket, WebSocketDisconnect from .stt.stt import stt_bytes -from .tts.tts import tts +from .tts.tts import stream_tts from pathlib import Path import asyncio import urllib.parse @@ -19,11 +19,13 @@ from .utils.kernel import put_kernel_messages_into_queue from .i import configure_interpreter from interpreter import interpreter import ngrok +from ..utils.accumulator import Accumulator from .utils.logs import setup_logging from .utils.logs import logger setup_logging() +accumulator = Accumulator() app = FastAPI() @@ -105,54 +107,89 @@ async def websocket_endpoint(websocket: WebSocket): async def receive_messages(websocket: WebSocket): while True: - data = await websocket.receive_json() - if data["role"] == "computer": - from_computer.put(data) # To be handled by interpreter.computer.run - elif data["role"] == "user": - await from_user.put(data) - else: - raise("Unknown role:", data) + try: + try: + data = await websocket.receive() + except Exception as e: + print(str(e)) + return + if 'text' in data: + try: + data = json.loads(data['text']) + if data["role"] == "computer": + from_computer.put(data) # To be handled by interpreter.computer.run + elif data["role"] == "user": + await from_user.put(data) + else: + raise("Unknown role:", data) + except json.JSONDecodeError: + pass # data is not JSON, leave it as is + elif 'bytes' in data: + data = data['bytes'] # binary data + await from_user.put(data) + except WebSocketDisconnect as e: + if e.code == 1000: + logger.info("Websocket connection closed normally.") + return + else: + raise + async def send_messages(websocket: WebSocket): while True: message = await to_device.get() logger.debug(f"Sending to the device: {type(message)} {message}") - await websocket.send_json(message) + + try: + if isinstance(message, dict): + await websocket.send_json(message) + elif isinstance(message, bytes): + await websocket.send_bytes(message) + else: + raise TypeError("Message must be a dict or bytes") + except: + # Make sure to put the message back in the queue if you failed to send it + await to_device.put(message) + raise async def listener(): - audio_bytes = bytearray() + while True: while True: if not from_user.empty(): - message = await from_user.get() + chunk = await from_user.get() break elif not from_computer.empty(): - message = from_computer.get() + chunk = from_computer.get() break await asyncio.sleep(1) - if type(message) == str: - message = json.loads(message) + - # Hold the audio in a buffer. If it's ready (we got end flag, stt it) - if message["type"] == "audio": - if "content" in message: - audio_bytes.extend(bytes(ast.literal_eval(message["content"]))) - if "end" in message: - content = stt_bytes(audio_bytes, message["format"]) - if content == None: # If it was nothing / silence - continue - audio_bytes = bytearray() - message = {"role": "user", "type": "message", "content": content} - else: + message = accumulator.accumulate(chunk) + if message == None: + # Will be None until we have a full message ready + continue + + # print(str(message)[:1000]) + + # At this point, we have our message + + if message["type"] == "audio" and message["format"].startswith("bytes"): + + if not message["content"]: # If it was nothing / silence continue - # Ignore flags, we only needed them for audio ^ - if "content" not in message or message["content"] == None: - continue + # Convert bytes to audio file + # Format will be bytes.wav or bytes.opus + mime_type = "audio/" + message["format"].split(".")[1] + text = stt_bytes(message["content"], mime_type) + message = {"role": "user", "type": "message", "content": text} + + # At this point, we have only text messages # Custom stop message will halt us - if message["content"].lower().strip(".,!") == "stop": + if message["content"].lower().strip(".,! ") == "stop": continue # Load, append, and save conversation history @@ -173,19 +210,31 @@ async def listener(): # Yield to the event loop, so you actually send it out await asyncio.sleep(0.01) - # Speak full sentences out loud - if chunk["role"] == "assistant" and "content" in chunk: - accumulated_text += chunk["content"] - sentences = split_into_sentences(accumulated_text) - if is_full_sentence(sentences[-1]): - for sentence in sentences: - await stream_or_play_tts(sentence) - accumulated_text = "" - else: - for sentence in sentences[:-1]: - await stream_or_play_tts(sentence) - accumulated_text = sentences[-1] - + if os.getenv('TTS_RUNNER') == "server": + # Speak full sentences out loud + if chunk["role"] == "assistant" and "content" in chunk: + accumulated_text += chunk["content"] + sentences = split_into_sentences(accumulated_text) + + # If we're going to speak, say we're going to stop sending text. + # This should be fixed probably, we should be able to do both in parallel, or only one. + if any(is_full_sentence(sentence) for sentence in sentences): + await to_device.put({"role": "assistant", "type": "message", "end": True}) + + if is_full_sentence(sentences[-1]): + for sentence in sentences: + await stream_tts_to_device(sentence) + accumulated_text = "" + else: + for sentence in sentences[:-1]: + await stream_tts_to_device(sentence) + accumulated_text = sentences[-1] + + # If we're going to speak, say we're going to stop sending text. + # This should be fixed probably, we should be able to do both in parallel, or only one. + if any(is_full_sentence(sentence) for sentence in sentences): + await to_device.put({"role": "assistant", "type": "message", "start": True}) + # If we have a new message, save our progress and go back to the top if not from_user.empty(): @@ -215,19 +264,12 @@ async def listener(): break else: with open(conversation_history_path, 'w') as file: - json.dump(interpreter.messages, file, indent=4) - - -async def stream_or_play_tts(sentence): - - if os.getenv('TTS_RUNNER') == "server": - tts(sentence, play_audio=True) - else: - await to_device.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "start": True}) - audio_bytes = tts(sentence, play_audio=False) - await to_device.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "content": str(audio_bytes)}) - await to_device.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "end": True}) + json.dump(interpreter.messages, file, indent=4) +async def stream_tts_to_device(sentence): + for chunk in stream_tts(sentence): + await to_device.put(chunk) + async def setup_ngrok(ngrok_auth_token, parsed_url): # Set up Ngrok logger.info("Setting up Ngrok") diff --git a/01OS/01OS/server/tts/tts.py b/01OS/01OS/server/tts/tts.py index 836a1dd..6106966 100644 --- a/01OS/01OS/server/tts/tts.py +++ b/01OS/01OS/server/tts/tts.py @@ -12,27 +12,28 @@ import os import subprocess import tempfile from pydub import AudioSegment -from pydub.playback import play -import simpleaudio as sa client = OpenAI() -def tts(text, play_audio): +chunk_size = 1024 + +def stream_tts(text): + """ + A generator that streams tts as LMC messages. + """ if os.getenv('ALL_LOCAL') == 'False': response = client.audio.speech.create( model="tts-1", voice="alloy", input=text, - response_format="mp3" + response_format="opus" ) - with tempfile.NamedTemporaryFile(suffix=".mp3") as temp_file: + with tempfile.NamedTemporaryFile(suffix=".opus") as temp_file: response.stream_to_file(temp_file.name) - - if play_audio: - audio = AudioSegment.from_mp3(temp_file.name) - play_audiosegment(audio) - return temp_file.read() + audio_bytes = temp_file.read() + file_type = "bytes.opus" + else: with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: output_file = temp_file.name @@ -43,13 +44,19 @@ def tts(text, play_audio): '--output_file', output_file ], input=text, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - if play_audio: - audio = AudioSegment.from_wav(temp_file.name) - play_audiosegment(audio) - return temp_file.read() + audio_bytes = temp_file.read() + file_type = "bytes.wav" + + # Stream the audio + yield {"role": "assistant", "type": "audio", "format": file_type, "start": True} + for i in range(0, len(audio_bytes), chunk_size): + chunk = audio_bytes[i:i+chunk_size] + yield chunk + yield {"role": "assistant", "type": "audio", "format": file_type, "end": True} def play_audiosegment(audio): """ + UNUSED the default makes some pops. this fixes that """ @@ -73,3 +80,6 @@ def play_audiosegment(audio): # Wait for the playback to finish play_obj.wait_done() + # Delete the wav file + os.remove("output_audio.wav") + diff --git a/01OS/01OS/utils/accumulator.py b/01OS/01OS/utils/accumulator.py new file mode 100644 index 0000000..93f9c2f --- /dev/null +++ b/01OS/01OS/utils/accumulator.py @@ -0,0 +1,40 @@ +class Accumulator: + def __init__(self): + self.template = {"role": None, "type": None, "format": None, "content": None} + self.message = self.template + + def accumulate(self, chunk): + print(str(chunk)[:100]) + if type(chunk) == dict: + + if "format" in chunk and chunk["format"] == "active_line": + # We don't do anything with these + return None + + if "start" in chunk: + self.message = chunk + self.message.pop("start") + return None + + if "content" in chunk: + if any(self.message[key] != chunk[key] for key in self.message if key != "content"): + self.message = chunk + if "content" not in self.message: + self.message["content"] = chunk["content"] + else: + self.message["content"] += chunk["content"] + return None + + if "end" in chunk: + # We will proceed + message = self.message + self.message = self.template + return message + + if type(chunk) == bytes: + if "content" not in self.message or type(self.message["content"]) != bytes: + self.message["content"] = b"" + self.message["content"] += chunk + return None + + \ No newline at end of file diff --git a/01OS/start.sh b/01OS/start.sh index fc7417f..36b7ec6 100755 --- a/01OS/start.sh +++ b/01OS/start.sh @@ -99,17 +99,16 @@ if [[ "$ALL_LOCAL" == "True" ]]; then curl -L "${PIPER_URL}${PIPER_ASSETNAME}" -o "${PIPER_ASSETNAME}" tar -xvzf $PIPER_ASSETNAME cd piper + curl -OL "${PIPER_VOICE_URL}${PIPER_VOICE_NAME}" + curl -OL "${PIPER_VOICE_URL}${PIPER_VOICE_NAME}.json" if [ "$OS" = "macos" ]; then if [ "$ARCH" = "x64" ]; then softwareupdate --install-rosetta --agree-to-license fi PIPER_PHONEMIZE_ASSETNAME="piper-phonemize_${OS}_${ARCH}.tar.gz" PIPER_PHONEMIZE_URL="https://github.com/rhasspy/piper-phonemize/releases/latest/download/" - curl -OL "${PIPER_PHONEMIZE_URL}${PIPER_PHONEMIZE_ASSETNAME}" tar -xvzf $PIPER_PHONEMIZE_ASSETNAME - curl -OL "${PIPER_VOICE_URL}${PIPER_VOICE_NAME}" - curl -OL "${PIPER_VOICE_URL}${PIPER_VOICE_NAME}.json" PIPER_DIR=`pwd` install_name_tool -change @rpath/libespeak-ng.1.dylib "${PIPER_DIR}/piper-phonemize/lib/libespeak-ng.1.dylib" "${PIPER_DIR}/piper" install_name_tool -change @rpath/libonnxruntime.1.14.1.dylib "${PIPER_DIR}/piper-phonemize/lib/libonnxruntime.1.14.1.dylib" "${PIPER_DIR}/piper" diff --git a/TASKS.md b/TASKS.md index fa1971e..4f6e4e0 100644 --- a/TASKS.md +++ b/TASKS.md @@ -17,10 +17,9 @@ - [ ] Computer: "Great! What's the next step?" .... - [ ] Repeat until all steps of skill are completed - [ ] Save skill as a function next() steps through user's steps - - [ ] Expose ^ via `interpreter --teach`. + - [ ] Expose ^ via `01 --teach`. - [ ] pip install 01 - - [ ] Add `interpreter --server --expose`. - - [ ] Include the 01's --server in the next OI update. + - [ ] Add `01 --server --expose`. - [ ] Add --server --expose which will expose the server via something like Ngrok, display the public URL and a password, so the 01 Light can connect to it. This will let people use OI on their computer via their Light — i.e. "Check my emails" will run Applescript on their home computer. - [ ] Sync Interpreter/Computer between code blocks - [ ] New default dynamic system message with computer API + skills.