diff --git a/OS/01/conversations/user.json b/OS/01/conversations/user.json index 89007e9..32c2cb4 100644 --- a/OS/01/conversations/user.json +++ b/OS/01/conversations/user.json @@ -1 +1 @@ -[{"role": "user", "type": "message", "content": "\ub2e4\uc74c \uc601\uc0c1\uc5d0\uc11c \ub9cc\ub098\uc694!\n"}] \ No newline at end of file +[{"role": "user", "type": "message", "content": "Yeah, it's explaining why you have to be a paramedic.\n"}, {"role": "user", "type": "message", "content": "\uc5b4\ub9b4\ub54c \uad1c\ucc2e\uc558\ub294\ub370 \uc544 \uadf8\ub798\uc11c \uc544\uce68\uc5d0 \uc9c4\uc9dc \uc548\uac00\uc9c0\uace0 \uc654\ub098\ubd10\uc694 \uc57c \ub098 \uc6ec\ub9cc\ud07c \ub9db\uc788\ub294\ub370\n"}, {"role": "user", "type": "message", "content": "Like, you'd have to go, like, out of houses.\n"}] \ No newline at end of file diff --git a/OS/01/device.py b/OS/01/device.py index a6a128b..080cb54 100644 --- a/OS/01/device.py +++ b/OS/01/device.py @@ -6,6 +6,7 @@ from starlette.websockets import WebSocket from queue import Queue from pynput import keyboard import json +import traceback import websockets import queue import pydub @@ -13,11 +14,13 @@ import ast from pydub import AudioSegment from pydub.playback import play import io +import time import wave import tempfile from datetime import datetime -from utils.check_filtered_kernel import check_filtered_kernel from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run? +from utils.put_kernel_messages_into_queue import put_kernel_messages_into_queue +from stt import stt_wav # Configuration for Audio Recording CHUNK = 1024 # Record in chunks of 1024 samples @@ -36,6 +39,16 @@ if not WS_URL: p = pyaudio.PyAudio() def record_audio(): + + if os.getenv('STT_RUNNER') == "server": + # STT will happen on the server. we're sending audio. + send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "start": True}) + elif os.getenv('STT_RUNNER') == "device": + # STT will happen here, on the device. we're sending text. + send_queue.put({"role": "user", "type": "message", "start": True}) + else: + raise Exception("STT_RUNNER must be set to either 'device' or 'server'.") + """Record audio from the microphone and add it to the queue.""" stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) print("Recording started...") @@ -64,8 +77,20 @@ def record_audio(): while byte_data: send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "content": str(byte_data)}) byte_data = audio_file.read(CHUNK) + + if os.getenv('STT_RUNNER') == "device": + text = stt_wav(wav_path) + send_queue.put({"role": "user", "type": "message", "content": text}) + + if os.getenv('STT_RUNNER') == "server": + # STT will happen on the server. we sent audio. + send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "end": True}) + elif os.getenv('STT_RUNNER') == "device": + # STT will happen here, on the device. we sent text. + send_queue.put({"role": "user", "type": "message", "end": True}) - send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "end": True}) + if os.path.exists(wav_path): + os.remove(wav_path) def toggle_recording(state): @@ -114,11 +139,13 @@ async def websocket_communication(WS_URL): async for message in websocket: + print(message) + if "content" in message_so_far: if any(message_so_far[key] != message[key] for key in message_so_far): message_so_far = message else: - message_so_far["content"] += message + message_so_far["content"] += message["content"] if message["type"] == "audio" and "content" in message: audio_bytes = bytes(ast.literal_eval(message["content"])) @@ -139,22 +166,25 @@ async def websocket_communication(WS_URL): code = message_so_far["content"] result = interpreter.computer.run(language, code) send_queue.put(result) - - + except: + traceback.print_exc() print(f"Connecting to `{WS_URL}`...") await asyncio.sleep(2) + -def main(): - # Start the WebSocket communication in a separate asyncio event loop - ws_thread = threading.Thread(target=lambda: asyncio.run(websocket_communication(WS_URL)), daemon=True) - ws_thread.start() +if __name__ == "__main__": + async def main(): + # Start the WebSocket communication + asyncio.create_task(websocket_communication(WS_URL)) - # Keyboard listener for spacebar press/release - with keyboard.Listener(on_press=on_press, on_release=on_release) as listener: - listener.join() + # Start watching the kernel if it's your job to do that + if os.getenv('CODE_RUNNER') == "device": + asyncio.create_task(put_kernel_messages_into_queue(send_queue)) - p.terminate() + # Keyboard listener for spacebar press/release + listener = keyboard.Listener(on_press=on_press, on_release=on_release) + listener.start() -if __name__ == "__main__": - main() \ No newline at end of file + asyncio.run(main()) + p.terminate() \ No newline at end of file diff --git a/OS/01/server.py b/OS/01/server.py index 16f2a9e..ad31c9c 100644 --- a/OS/01/server.py +++ b/OS/01/server.py @@ -4,19 +4,22 @@ import json import time import queue import os +import traceback from queue import Queue from threading import Thread +import threading import uvicorn import re from fastapi import FastAPI from threading import Thread from starlette.websockets import WebSocket -from stt import stt +from stt import stt_bytes from tts import tts from pathlib import Path import asyncio -from i import configure_interpreter import urllib.parse +from utils.put_kernel_messages_into_queue import put_kernel_messages_into_queue +from i import configure_interpreter from interpreter import interpreter app = FastAPI() @@ -30,10 +33,10 @@ def is_full_sentence(text): def split_into_sentences(text): return re.split(r'(?<=[.!?])\s+', text) -# Global queues -receive_queue = queue.Queue() -send_queue = queue.Queue() -recieve_computer_queue = queue.Queue() # Just for computer messages from the device +# Queues +from_computer = queue.Queue() # Just for computer messages from the device. Sync queue because interpreter.run is synchronous +from_user = asyncio.Queue() # Just for user messages from the device. +to_device = asyncio.Queue() # For messages we send. # Switch code executor to device if that's set @@ -56,14 +59,14 @@ if os.getenv('CODE_RUNNER') == "device": # Unless it was just sent to the device, send it wrapped in flags if not (interpreter.messages and interpreter.messages[-1] == message): - send_queue.put({"role": "assistant", "type": "code", "format": "python", "start": True}) - send_queue.put(message) - send_queue.put({"role": "assistant", "type": "code", "format": "python", "end": True}) + to_device.put({"role": "assistant", "type": "code", "format": "python", "start": True}) + to_device.put(message) + to_device.put({"role": "assistant", "type": "code", "format": "python", "end": True}) # Stream the response print("Waiting for the device to respond...") while True: - chunk = recieve_computer_queue.get() + chunk = from_computer.get() print("Server recieved from device:", chunk) if "end" in chunk: break @@ -87,47 +90,52 @@ async def websocket_endpoint(websocket: WebSocket): await websocket.accept() receive_task = asyncio.create_task(receive_messages(websocket)) send_task = asyncio.create_task(send_messages(websocket)) - await asyncio.gather(receive_task, send_task) + try: + await asyncio.gather(receive_task, send_task) + except Exception as e: + traceback.print_exc() + print(f"Connection lost. Error: {e}") async def receive_messages(websocket: WebSocket): while True: - data = await websocket.receive_text() - if type(data) == dict and data["role"] == "computer": - recieve_computer_queue.put(data) # To be handled by interpreter.computer.run + data = await websocket.receive_json() + if data["role"] == "computer": + from_computer.put(data) # To be handled by interpreter.computer.run + elif data["role"] == "user": + await from_user.put(data) else: - receive_queue.put(data) + raise("Unknown role:", data) async def send_messages(websocket: WebSocket): while True: - message = await asyncio.get_event_loop().run_in_executor(None, send_queue.get) + message = await to_device.get() print(message) await websocket.send_json(message) -def queue_listener(): - audio_file = bytearray() +async def user_listener(): + audio_bytes = bytearray() while True: - # Check 10x a second for new messages - while receive_queue.empty(): - time.sleep(0.1) - message = receive_queue.get() - - message = json.loads(message) + message = await from_user.get() # Hold the audio in a buffer. If it's ready (we got end flag, stt it) if message["type"] == "audio": if "content" in message: - audio_file.extend(bytes(ast.literal_eval(message["content"]))) + audio_bytes.extend(bytes(ast.literal_eval(message["content"]))) if "end" in message: - content = stt(audio_file, message["format"]) + content = stt_bytes(audio_bytes, message["format"]) if content == None: # If it was nothing / silence continue - audio_file = bytearray() + audio_bytes = bytearray() message = {"role": "user", "type": "message", "content": content} else: continue + # Ignore flags, we only needed them for audio ^ + if "content" not in message: + continue + # Custom stop message will halt us - if message.get("content") and message.get("content").lower().strip(".,!") == "stop": + if message["content"].lower().strip(".,!") == "stop": continue # Load, append, and save conversation history @@ -142,53 +150,59 @@ def queue_listener(): for chunk in interpreter.chat(messages, stream=True): # Send it to the user - send_queue.put(chunk) + await to_device.put(chunk) # Speak full sentences out loud if chunk["role"] == "assistant" and "content" in chunk: - print("Chunk role is assistant and content is present in chunk.") accumulated_text += chunk["content"] - print("Accumulated text: ", accumulated_text) sentences = split_into_sentences(accumulated_text) - print("Sentences after splitting: ", sentences) if is_full_sentence(sentences[-1]): - print("Last sentence is a full sentence.") for sentence in sentences: - print("Streaming sentence: ", sentence) - stream_tts_to_user(sentence) + await stream_or_play_tts(sentence) accumulated_text = "" - print("Reset accumulated text.") else: - print("Last sentence is not a full sentence.") for sentence in sentences[:-1]: - print("Streaming sentence: ", sentence) - stream_tts_to_user(sentence) + await stream_or_play_tts(sentence) accumulated_text = sentences[-1] - print("Accumulated text is now the last sentence: ", accumulated_text) # If we have a new message, save our progress and go back to the top - if not receive_queue.empty(): + if not from_user.empty(): with open(conversation_history_path, 'w') as file: json.dump(interpreter.messages, file) break -def stream_tts_to_user(sentence): - send_queue.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "start": True}) - audio_bytes = tts(sentence) - send_queue.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "content": str(audio_bytes)}) - send_queue.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "end": True}) +async def stream_or_play_tts(sentence): + + if os.getenv('TTS_RUNNER') == "server": + tts(sentence, play_audio=True) + else: + await to_device.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "start": True}) + audio_bytes = tts(sentence, play_audio=False) + await to_device.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "content": str(audio_bytes)}) + await to_device.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "end": True}) -# Create a thread for the queue listener -queue_thread = Thread(target=queue_listener) -# Start the queue listener thread -queue_thread.start() +from uvicorn import Config, Server # Run the FastAPI app if __name__ == "__main__": - server_url = os.getenv('SERVER_URL') - if not server_url: - raise ValueError("The environment variable SERVER_URL is not set. Please set it to proceed.") - parsed_url = urllib.parse.urlparse(server_url) - print("Starting `server.py`...") - uvicorn.run(app, host=parsed_url.hostname, port=parsed_url.port) + + async def main(): + # Start listening to the user + asyncio.create_task(user_listener()) + + # Start watching the kernel if it's your job to do that + if os.getenv('CODE_RUNNER') == "server": + asyncio.create_task(put_kernel_messages_into_queue(from_computer)) + + server_url = os.getenv('SERVER_URL') + if not server_url: + raise ValueError("The environment variable SERVER_URL is not set. Please set it to proceed.") + parsed_url = urllib.parse.urlparse(server_url) + print("Starting `server.py`...") + + config = Config(app, host=parsed_url.hostname, port=parsed_url.port, lifespan='on') + server = Server(config) + await server.serve() + + asyncio.run(main()) \ No newline at end of file diff --git a/OS/01/start.sh b/OS/01/start.sh index 130c5ef..3c44810 100755 --- a/OS/01/start.sh +++ b/OS/01/start.sh @@ -12,7 +12,7 @@ export DEVICE_START=True # Control where various operations happen— can be `device` or `server`. export CODE_RUNNER=server -export TTS_RUNNER=device # If server, audio will be sent over websocket. +export TTS_RUNNER=server # If device, audio will be sent over websocket. export STT_RUNNER=device # If server, audio will be sent over websocket. # Will expose the server publically and display that URL. @@ -22,10 +22,14 @@ export SERVER_EXPOSE_PUBLICALLY=False # (for dev, reset the ports we were using) -PORT=$(echo $SERVER_URL | grep -oE "[0-9]+") -lsof -ti tcp:$PORT | xargs kill -PORT=$(echo $DEVICE_URL | grep -oE "[0-9]+") -lsof -ti tcp:$PORT | xargs kill +SERVER_PORT=$(echo $SERVER_URL | grep -oE "[0-9]+") +if [ -n "$SERVER_PORT" ]; then + lsof -ti tcp:$SERVER_PORT | xargs kill +fi +DEVICE_PORT=$(echo $DEVICE_URL | grep -oE "[0-9]+") +if [ -n "$DEVICE_PORT" ]; then + lsof -ti tcp:$DEVICE_PORT | xargs kill +fi # Check the current Python version PYTHON_VERSION=$(python -V 2>&1 | cut -d " " -f 2 | cut -d "." -f 1-2) diff --git a/OS/01/stt.py b/OS/01/stt.py index 7afe1ed..d9ac3fa 100644 --- a/OS/01/stt.py +++ b/OS/01/stt.py @@ -44,18 +44,21 @@ def export_audio_to_wav_ffmpeg(audio: bytearray, mime_type: str) -> str: os.remove(input_path) os.remove(output_path) -def stt(audio_bytes: bytearray, mime_type): +def stt_bytes(audio_bytes: bytearray, mime_type="audio/wav"): with export_audio_to_wav_ffmpeg(audio_bytes, mime_type) as wav_file_path: - audio_file = open(wav_file_path, "rb") - try: - transcript = client.audio.transcriptions.create( - model="whisper-1", - file=audio_file, - response_format="text" - ) - except openai.BadRequestError as e: - print("openai.BadRequestError:", e) - return None - - print("Exciting transcription result:", transcript) - return transcript + return stt_wav(wav_file_path) + +def stt_wav(wav_file_path: str): + audio_file = open(wav_file_path, "rb") + try: + transcript = client.audio.transcriptions.create( + model="whisper-1", + file=audio_file, + response_format="text" + ) + except openai.BadRequestError as e: + print("openai.BadRequestError:", e) + return None + + print("Exciting transcription result:", transcript) + return transcript diff --git a/OS/01/tts.py b/OS/01/tts.py index 14e1ba0..027dfc5 100644 --- a/OS/01/tts.py +++ b/OS/01/tts.py @@ -9,7 +9,7 @@ from pydub.playback import play client = OpenAI() -def tts(text): +def tts(text, play_audio): response = client.audio.speech.create( model="tts-1", voice="alloy", @@ -19,9 +19,10 @@ def tts(text): with tempfile.NamedTemporaryFile() as temp_file: response.stream_to_file(temp_file.name) - audio = AudioSegment.from_file(temp_file.name, format="mp3") - # Gradual fade in and out over 0.2 seconds - audio = audio.fade_in(200).fade_out(200) - play(audio) + if play_audio: + audio = AudioSegment.from_file(temp_file.name, format="mp3") + # Gradual fade in and out over 0.2 seconds + audio = audio.fade_in(200).fade_out(200) + play(audio) return temp_file.read() diff --git a/OS/01/utils/check_filtered_kernel.py b/OS/01/utils/check_filtered_kernel.py index f31c6ac..f0a441d 100644 --- a/OS/01/utils/check_filtered_kernel.py +++ b/OS/01/utils/check_filtered_kernel.py @@ -1,5 +1,13 @@ +""" +Watches the kernel. When it sees something that passes a filter, +it sends POST request with that to /computer. +""" + import subprocess +import time +import requests import platform +import os def get_kernel_messages(): """ @@ -31,10 +39,11 @@ def custom_filter(message): return message else: return None - + last_messages = "" def check_filtered_kernel(): + while True: messages = get_kernel_messages() messages.replace(last_messages, "") messages = messages.split("\n") @@ -43,4 +52,4 @@ def check_filtered_kernel(): for message in messages: if custom_filter(message): filtered_messages.append(message) - return filtered_messages \ No newline at end of file + return "\n".join(filtered_messages) diff --git a/OS/01/utils/put_kernel_messages_into_queue.py b/OS/01/utils/put_kernel_messages_into_queue.py new file mode 100644 index 0000000..e3a2702 --- /dev/null +++ b/OS/01/utils/put_kernel_messages_into_queue.py @@ -0,0 +1,17 @@ +from .check_filtered_kernel import check_filtered_kernel +import asyncio + +async def put_kernel_messages_into_queue(queue): + while True: + text = check_filtered_kernel() + if text: + if isinstance(queue, asyncio.Queue): + await queue.put({"role": "computer", "type": "console", "start": True}) + await queue.put({"role": "computer", "type": "console", "format": "output", "content": text}) + await queue.put({"role": "computer", "type": "console", "end": True}) + else: + queue.put({"role": "computer", "type": "console", "start": True}) + queue.put({"role": "computer", "type": "console", "format": "output", "content": text}) + queue.put({"role": "computer", "type": "console", "end": True}) + + await asyncio.sleep(5) \ No newline at end of file