diff --git a/OS/01/assistant/assistant.py b/OS/01/assistant/assistant.py index 5f56f79..9e54643 100644 --- a/OS/01/assistant/assistant.py +++ b/OS/01/assistant/assistant.py @@ -12,6 +12,7 @@ import json import time import queue import os +from queue import Queue from threading import Thread import uvicorn import re @@ -28,9 +29,7 @@ interpreter = create_interpreter() conversation_history_path = Path(__file__).parent / 'conversations' / 'user.json' -# Create Queue objects -to_user = queue.Queue() -to_assistant = queue.Queue() + # This is so we only say() full sentences def is_full_sentence(text): @@ -41,32 +40,65 @@ def split_into_sentences(text): app = FastAPI() -@app.post("/computer") -async def read_computer(item: dict): - to_assistant.put(item) + + +import asyncio + + +# Global queues +receive_queue = queue.Queue() +send_queue = queue.Queue() + @app.websocket("/user") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() + receive_task = asyncio.create_task(receive_messages(websocket)) + send_task = asyncio.create_task(send_messages(websocket)) + await asyncio.gather(receive_task, send_task) + +async def receive_messages(websocket: WebSocket): + while True: + data = await websocket.receive_text() + receive_queue.put(data) + +async def send_messages(websocket: WebSocket): while True: - try: - data = await websocket.receive_json() - to_assistant.put(data) - while not to_user.empty(): - message = to_user.get() - print("sending a message!") - await websocket.send_json(message) - except WebSocketDisconnect: - pass + message = await asyncio.get_event_loop().run_in_executor(None, send_queue.get) + print(message) + await websocket.send_json(message) + + + + + + + +@app.post("/computer") +async def read_computer(item: dict): + await asyncio.get_event_loop().run_in_executor(None, receive_queue.put, item) + + + + + + + + + + + def queue_listener(): audio_file = bytearray() while True: # Check 10x a second for new messages - while to_assistant.empty(): + while receive_queue.empty(): time.sleep(0.1) - message = to_assistant.get() + message = receive_queue.get() + + message = json.loads(message) # Hold the audio in a buffer. If it's ready (we got end flag, stt it) if message["type"] == "audio": @@ -97,7 +129,7 @@ def queue_listener(): for chunk in interpreter.chat(messages, stream=True): # Send it to the user - to_user.put(chunk) + send_queue.put(chunk) # Speak full sentences out loud if chunk["role"] == "assistant" and "content" in chunk: @@ -122,16 +154,16 @@ def queue_listener(): print("Accumulated text is now the last sentence: ", accumulated_text) # If we have a new message, save our progress and go back to the top - if not to_assistant.empty(): + if not receive_queue.empty(): with open(conversation_history_path, 'w') as file: json.dump(interpreter.messages, file) break def stream_tts_to_user(sentence): - to_user.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "start": True}) + send_queue.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "start": True}) audio_bytes = tts(sentence) - to_user.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "content": str(audio_bytes)}) - to_user.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "end": True}) + send_queue.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "content": str(audio_bytes)}) + send_queue.put({"role": "assistant", "type": "audio", "format": "audio/mp3", "end": True}) # Create a thread for the queue listener queue_thread = Thread(target=queue_listener) diff --git a/OS/01/assistant/conversations/user.json b/OS/01/assistant/conversations/user.json index 0637a08..02895a6 100644 --- a/OS/01/assistant/conversations/user.json +++ b/OS/01/assistant/conversations/user.json @@ -1 +1 @@ -[] \ No newline at end of file +[{"role": "user", "type": "message", "content": "Disgusting.\n"}] \ No newline at end of file diff --git a/OS/01/assistant/skills/schedule.py b/OS/01/assistant/skills/schedule.py index 5ca9c23..fc62147 100644 --- a/OS/01/assistant/skills/schedule.py +++ b/OS/01/assistant/skills/schedule.py @@ -1,11 +1,7 @@ import threading from datetime import datetime import json -import time -import redis - -# Connect to Redis -r = redis.Redis() +import subprocess def add_message_to_queue(message): @@ -16,9 +12,7 @@ def add_message_to_queue(message): "format": "output", "content": message }) - - # Add the message to the 'to_main' queue - r.rpush('to_main', message_json) + subprocess.run(['logger', '{TO_INTERPRETER{' + message_json + '}TO_INTERPRETER}']) def schedule(dt, message): # Calculate the delay in seconds diff --git a/OS/01/assistant/tts.py b/OS/01/assistant/tts.py index d752295..1dff048 100644 --- a/OS/01/assistant/tts.py +++ b/OS/01/assistant/tts.py @@ -4,6 +4,8 @@ Defines a function which takes text and returns a path to an audio file. import tempfile from openai import OpenAI +from pydub import AudioSegment +from pydub.playback import play client = OpenAI() @@ -16,4 +18,10 @@ def tts(text): ) with tempfile.NamedTemporaryFile() as temp_file: response.stream_to_file(temp_file.name) + + # audio = AudioSegment.from_file(temp_file.name, format="mp3") + # # Gradual fade in and out over 0.2 seconds + # audio = audio.fade_in(200).fade_out(200) + # play(audio) + return temp_file.read() diff --git a/OS/01/computer/kernel_watcher.py b/OS/01/computer/kernel_watcher.py index 2a3b99f..383609e 100644 --- a/OS/01/computer/kernel_watcher.py +++ b/OS/01/computer/kernel_watcher.py @@ -9,87 +9,21 @@ import requests import platform import os -class Device: - def __init__(self, device_type, device_info): - self.device_type = device_type - self.device_info = device_info - - def get_device_info(self): - info = f"Device Type: {self.device_type}\n" - for key, value in self.device_info.items(): - info += f"{key}: {value}\n" - return info - - def __eq__(self, other): - if isinstance(other, Device): - return self.device_type == other.device_type and self.device_info == other.device_info - return False - - -def get_connected_devices(): - """ - Get all connected devices on macOS using system_profiler - """ - devices = [] - usb_output = subprocess.check_output(['system_profiler', 'SPUSBDataType']) - network_output = subprocess.check_output(['system_profiler', 'SPNetworkDataType']) - - usb_lines = usb_output.decode('utf-8').split('\n') - network_lines = network_output.decode('utf-8').split('\n') - - device_info = {} - for line in usb_lines: - if 'Product ID:' in line or 'Serial Number:' in line or 'Manufacturer:' in line: - key, value = line.strip().split(':') - device_info[key.strip()] = value.strip() - if 'Manufacturer:' in line: - devices.append(Device('USB', device_info)) - device_info = {} - - for line in network_lines: - if 'Type:' in line or 'Hardware:' in line or 'BSD Device Name:' in line: - key, value = line.strip().split(':') - device_info[key.strip()] = value.strip() - if 'BSD Device Name:' in line: - devices.append(Device('Network', device_info)) - device_info = {} - - return devices - - -def run_kernel_watch_darwin(): - prev_connected_devices = None - while True: - messages_to_send = [] - connected_devices = get_connected_devices() - if prev_connected_devices is not None: - for device in connected_devices: - if device not in prev_connected_devices: - messages_to_send.append(f'New device connected: {device.get_device_info()}') - for device in prev_connected_devices: - if device not in connected_devices: - messages_to_send.append(f'Device disconnected: {device.get_device_info()}') - - if messages_to_send: - requests.post('http://localhost:8000/computer', json = {'messages': messages_to_send}) - prev_connected_devices = connected_devices - - time.sleep(2) - - -def get_dmesg(after): +def get_kernel_messages(): """ Is this the way to do this? """ - messages = [] - with open('/var/log/dmesg', 'r') as file: - lines = file.readlines() - for line in lines: - timestamp = float(line.split(' ')[0].strip('[]')) - if timestamp > after: - messages.append(line) - return messages - + current_platform = platform.system() + + if current_platform == "Darwin": + process = subprocess.Popen(['syslog'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + output, _ = process.communicate() + return output.decode('utf-8') + elif current_platform == "Linux": + with open('/var/log/dmesg', 'r') as file: + return file.read() + else: + print("Unsupported platform.") def custom_filter(message): # Check for {TO_INTERPRETER{ message here }TO_INTERPRETER} pattern @@ -107,12 +41,12 @@ def custom_filter(message): return None -def run_kernel_watch_linux(): - last_timestamp = time.time() - +def main(): + last_messages = "" while True: - messages = get_dmesg(after=last_timestamp) - last_timestamp = time.time() + messages = get_kernel_messages() + messages.replace(last_messages, "") + messages = messages.split("\n") messages_for_core = [] for message in messages: @@ -121,14 +55,7 @@ def run_kernel_watch_linux(): if messages_for_core: port = os.getenv('ASSISTANT_PORT', 8000) requests.post(f'http://localhost:{port}/computer', json = {'messages': messages_for_core}) - time.sleep(2) - + time.sleep(5) if __name__ == "__main__": - current_platform = platform.system() - if current_platform == "Darwin": - run_kernel_watch_darwin() - elif current_platform == "Linux": - run_kernel_watch_linux() - else: - print("Unsupported platform. Exiting.") + main() diff --git a/OS/01/user/user.py b/OS/01/user/user.py index 458bd56..e2768af 100644 --- a/OS/01/user/user.py +++ b/OS/01/user/user.py @@ -1,11 +1,13 @@ import asyncio import threading -import websockets import os import pyaudio +from starlette.websockets import WebSocket from queue import Queue from pynput import keyboard import json +import websockets +import queue import pydub import ast @@ -24,8 +26,6 @@ WS_URL = f"ws://localhost:{PORT}/user" # Initialize PyAudio p = pyaudio.PyAudio() -# Queue for sending data -data_queue = Queue() import wave import tempfile @@ -59,10 +59,10 @@ def record_audio(): with open(wav_path, 'rb') as audio_file: byte_data = audio_file.read(CHUNK) while byte_data: - data_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "content": str(byte_data)}) + send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "content": str(byte_data)}) byte_data = audio_file.read(CHUNK) - data_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "end": True}) + send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "end": True}) def toggle_recording(state): @@ -77,48 +77,6 @@ def toggle_recording(state): SPACEBAR_PRESSED = False RECORDING = False -async def websocket_communication(): - """Handle WebSocket communication and listen for incoming messages.""" - while True: - try: - async with websockets.connect(WS_URL) as websocket: - - print("Press the spacebar to start/stop recording. Press ESC to exit.") - - while True: - # Send data from the queue to the server - while not data_queue.empty(): - data = data_queue.get_nowait() - await websocket.send(json.dumps(data)) - - # Listen for incoming messages from the server - try: - chunk = await asyncio.wait_for(websocket.recv(), timeout=1.0) - print(f"Received from server: {str(chunk)[:100]}") - - if chunk["type"] == "audio": - if "start" in chunk: - audio_chunks = bytearray() - if "content" in chunk: - audio_chunks.extend(bytes(ast.literal_eval(chunk["content"]))) - if "end" in chunk: - with tempfile.NamedTemporaryFile(suffix=".mp3") as f: - f.write(audio_chunks) - f.seek(0) - seg = pydub.AudioSegment.from_mp3(f.name) - pydub.playback.play(seg) - - except asyncio.TimeoutError: - # No message received within timeout period - pass - - await asyncio.sleep(0.1) - except Exception as e: - print(f"Websocket not ready, retrying... ({e})") - await asyncio.sleep(1) - - - def on_press(key): """Detect spacebar press.""" if key == keyboard.Key.space: @@ -132,14 +90,37 @@ def on_release(key): print("Exiting...") os._exit(0) +import asyncio + +send_queue = queue.Queue() + +async def message_sender(websocket): + while True: + message = await asyncio.get_event_loop().run_in_executor(None, send_queue.get) + await websocket.send(json.dumps(message)) + send_queue.task_done() + +async def websocket_communication(WS_URL): + while True: + try: + async with websockets.connect(WS_URL) as websocket: + print("Press the spacebar to start/stop recording. Press ESC to exit.") + asyncio.create_task(message_sender(websocket)) + + async for message in websocket: + print(message) + await asyncio.sleep(1) + except: + print("Connecting...") + await asyncio.sleep(2) + def main(): # Start the WebSocket communication in a separate asyncio event loop - ws_thread = threading.Thread(target=lambda: asyncio.run(websocket_communication()), daemon=True) + ws_thread = threading.Thread(target=lambda: asyncio.run(websocket_communication(WS_URL)), daemon=True) ws_thread.start() # Keyboard listener for spacebar press/release with keyboard.Listener(on_press=on_press, on_release=on_release) as listener: - print("In a moment, press the spacebar to start/stop recording. Press ESC to exit.") listener.join() p.terminate()