From 621c5cd048425845ea1dc20ed250c69dfe573859 Mon Sep 17 00:00:00 2001 From: Davy Peter Braun <543614+dheavy@users.noreply.github.com> Date: Fri, 29 Mar 2024 16:46:47 +0100 Subject: [PATCH] Fix missing Device for Windows --- software/source/clients/base_device.py | 173 +++++++++++--------- software/source/clients/windows/__init__.py | 0 software/source/clients/windows/device.py | 10 ++ software/source/server/server.py | 39 ++--- software/source/server/utils/kernel.py | 115 +++++++------ software/start.py | 44 ++--- 6 files changed, 212 insertions(+), 169 deletions(-) create mode 100644 software/source/clients/windows/__init__.py create mode 100644 software/source/clients/windows/device.py diff --git a/software/source/clients/base_device.py b/software/source/clients/base_device.py index 40af5a2..6d0f645 100644 --- a/software/source/clients/base_device.py +++ b/software/source/clients/base_device.py @@ -1,5 +1,8 @@ from dotenv import load_dotenv +from source.server.utils.logs import setup_logging, logger + load_dotenv() # take environment variables from .env. +setup_logging() import os import asyncio @@ -11,7 +14,7 @@ from queue import Queue from pynput import keyboard import json import traceback -import websockets +import websocket as wsc import queue import pydub import ast @@ -26,7 +29,7 @@ import cv2 import base64 from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run? # In the future, I guess kernel watching code should be elsewhere? Somewhere server / client agnostic? -from ..server.utils.kernel import put_kernel_messages_into_queue +from ..server.utils.kernel import KernelChecker, put_kernel_messages_into_queue from ..server.utils.get_system_info import get_system_info from ..server.utils.process_utils import kill_process_tree @@ -41,13 +44,11 @@ from ..utils.accumulator import Accumulator accumulator = Accumulator() -# Configuration for Audio Recording -CHUNK = 1024 # Record in chunks of 1024 samples -FORMAT = pyaudio.paInt16 # 16 bits per sample -CHANNELS = 1 # Mono -RATE = 44100 # Sample rate -RECORDING = False # Flag to control recording state -SPACEBAR_PRESSED = False # Flag to track spacebar press state +# AudioSegment configuration +AUDIO_SAMPLE_WIDTH = 2 +AUDIO_FRAME_RATE = 16000 +AUDIO_MONO_CHANNEL = 1 +AUDIO_CODE_RUNNER_CLIENT = "client" # Camera configuration CAMERA_ENABLED = os.getenv('CAMERA_ENABLED', False) @@ -71,6 +72,14 @@ class Device: self.audiosegments = [] self.server_url = "" + # Configuration for Audio Recording + self.CHUNK = 1024 # Record in chunks of 1024 samples + self.FORMAT = pyaudio.paInt16 # 16 bits per sample + self.CHANNELS = 1 # Mono + self.RATE = 44100 # Sample rate + self.RECORDING = False # Flag to control recording state + self.SPACEBAR_PRESSED = False # Flag to track spacebar press state + def fetch_image_from_camera(self, camera_index=CAMERA_DEVICE_INDEX): """Captures an image from the specified camera device and saves it to a temporary file. Adds the image to the captured_images list.""" image_path = None @@ -98,7 +107,7 @@ class Device: cap.release() return image_path - + def encode_image_to_base64(self, image_path): """Encodes an image file to a base64 string.""" @@ -124,7 +133,7 @@ class Device: self.add_image_to_send_queue(image_path) self.captured_images.clear() # Clear the list after sending - + async def play_audiosegments(self): """Plays them sequentially.""" while True: @@ -141,7 +150,6 @@ class Device: def record_audio(self): - if os.getenv('STT_RUNNER') == "server": # STT will happen on the server. we're sending audio. send_queue.put({"role": "user", "type": "audio", "format": "bytes.wav", "start": True}) @@ -152,20 +160,19 @@ class Device: raise Exception("STT_RUNNER must be set to either 'client' or 'server'.") """Record audio from the microphone and add it to the queue.""" - stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) + stream = p.open(format=self.FORMAT, channels=self.CHANNELS, rate=self.RATE, input=True, frames_per_buffer=self.CHUNK) print("Recording started...") - global RECORDING # Create a temporary WAV file to store the audio data temp_dir = tempfile.gettempdir() wav_path = os.path.join(temp_dir, f"audio_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav") wav_file = wave.open(wav_path, 'wb') - wav_file.setnchannels(CHANNELS) - wav_file.setsampwidth(p.get_sample_size(FORMAT)) - wav_file.setframerate(RATE) + wav_file.setnchannels(self.CHANNELS) + wav_file.setsampwidth(p.get_sample_size(self.FORMAT)) + wav_file.setframerate(self.RATE) - while RECORDING: - data = stream.read(CHUNK, exception_on_overflow=False) + while self.RECORDING: + data = stream.read(self.CHUNK, exception_on_overflow=False) wav_file.writeframes(data) wav_file.close() @@ -173,7 +180,7 @@ class Device: stream.close() print("Recording stopped.") - duration = wav_file.getnframes() / RATE + duration = wav_file.getnframes() / self.RATE if duration < 0.3: # Just pressed it. Send stop message if os.getenv('STT_RUNNER') == "client": @@ -198,10 +205,10 @@ class Device: else: # Stream audio with open(wav_path, 'rb') as audio_file: - byte_data = audio_file.read(CHUNK) + byte_data = audio_file.read(self.CHUNK) while byte_data: send_queue.put(byte_data) - byte_data = audio_file.read(CHUNK) + byte_data = audio_file.read(self.CHUNK) send_queue.put({"role": "user", "type": "audio", "format": "bytes.wav", "end": True}) if os.path.exists(wav_path): @@ -209,15 +216,14 @@ class Device: def toggle_recording(self, state): """Toggle the recording state.""" - global RECORDING, SPACEBAR_PRESSED - if state and not SPACEBAR_PRESSED: - SPACEBAR_PRESSED = True - if not RECORDING: - RECORDING = True + if state and not self.SPACEBAR_PRESSED: + self.SPACEBAR_PRESSED = True + if not self.RECORDING: + self.RECORDING = True threading.Thread(target=self.record_audio).start() - elif not state and SPACEBAR_PRESSED: - SPACEBAR_PRESSED = False - RECORDING = False + elif not state and self.SPACEBAR_PRESSED: + self.SPACEBAR_PRESSED = False + self.RECORDING = False def on_press(self, key): """Detect spacebar press and Ctrl+C combination.""" @@ -239,7 +245,7 @@ class Device: elif CAMERA_ENABLED and key == keyboard.KeyCode.from_char('c'): self.fetch_image_from_camera() - + async def message_sender(self, websocket): while True: message = await asyncio.get_event_loop().run_in_executor(None, send_queue.get) @@ -254,62 +260,74 @@ class Device: show_connection_log = True while True: try: - async with websockets.connect(WS_URL) as websocket: - if CAMERA_ENABLED: - print("\nHold the spacebar to start recording. Press 'c' to capture an image from the camera. Press CTRL-C to exit.") - else: - print("\nHold the spacebar to start recording. Press CTRL-C to exit.") - - asyncio.create_task(self.message_sender(websocket)) + print("Attempting to connect to the WS server...") - while True: - await asyncio.sleep(0.01) - chunk = await websocket.recv() + try: + ws = wsc.create_connection(WS_URL, timeout=5) + except wsc.WebSocketTimeoutException: + print("Timeout while trying to connect to the WebSocket server.") + continue + print("Connected to the WS server.") - logger.debug(f"Got this message from the server: {type(chunk)} {chunk}") + if CAMERA_ENABLED: + print("\nHold the spacebar to start recording. Press 'c' to capture an image from the camera. Press CTRL-C to exit.") + else: + print("\nHold the spacebar to start recording. Press CTRL-C to exit.") - if type(chunk) == str: - chunk = json.loads(chunk) + asyncio.create_task(self.message_sender(ws)) - message = accumulator.accumulate(chunk) - if message == None: - # Will be None until we have a full message ready - continue + while True: + await asyncio.sleep(0.01) + chunk = await ws.recv() - # At this point, we have our message + logger.debug(f"Got this message from the server: {type(chunk)} {chunk}") - if message["type"] == "audio" and message["format"].startswith("bytes"): + if isinstance(chunk, str): + chunk = json.loads(chunk) - # Convert bytes to audio file + message = accumulator.accumulate(chunk) + if message is None: + # Will be None until we have a full message ready + continue - audio_bytes = message["content"] + # At this point, we have our message - # Create an AudioSegment instance with the raw data - audio = AudioSegment( - # raw audio data (bytes) - data=audio_bytes, - # signed 16-bit little-endian format - sample_width=2, - # 16,000 Hz frame rate - frame_rate=16000, - # mono sound - channels=1 - ) + if message["type"] == "audio" and message["format"].startswith("bytes"): + # Convert bytes to audio file + audio_bytes = message["content"] - self.audiosegments.append(audio) + # Create an AudioSegment instance with the raw data + audio = AudioSegment( + data=audio_bytes, + sample_width=AUDIO_SAMPLE_WIDTH, + frame_rate=AUDIO_FRAME_RATE, + channels=AUDIO_MONO_CHANNEL + ) - # Run the code if that's the client's job - if os.getenv('CODE_RUNNER') == "client": - if message["type"] == "code" and "end" in message: - language = message["format"] - code = message["content"] - result = interpreter.computer.run(language, code) - send_queue.put(result) - except: + self.audiosegments.append(audio) + + # Run the code if that's the client's job + if os.getenv('CODE_RUNNER') == AUDIO_CODE_RUNNER_CLIENT: + if message["type"] == "code" and "end" in message: + language = message["format"] + code = message["content"] + result = interpreter.computer.run(language, code) + send_queue.put(result) + + except wsc.WebSocketConnectionClosedException: + print("WebSocket connection closed unexpectedly.") + if show_connection_log: + logger.info(f"Reconnecting to `{WS_URL}`...") + show_connection_log = False + await asyncio.sleep(2) + except wsc.WebSocketAddressException: + print(f"Invalid WebSocket URI: `{WS_URL}`. Please check the URI and try again.") + break # Exit the loop as the URI is invalid and retrying won't help + except Exception as e: logger.debug(traceback.format_exc()) if show_connection_log: - logger.info(f"Connecting to `{WS_URL}`...") - show_connection_log = False + logger.info(f"Connecting to `{WS_URL}`...") + show_connection_log = False await asyncio.sleep(2) async def start_async(self): @@ -320,13 +338,14 @@ class Device: # Start watching the kernel if it's your job to do that if os.getenv('CODE_RUNNER') == "client": - asyncio.create_task(put_kernel_messages_into_queue(send_queue)) + kernel_checker = KernelChecker() + asyncio.create_task(put_kernel_messages_into_queue(kernel_checker, send_queue)) asyncio.create_task(self.play_audiosegments()) - + # If Raspberry Pi, add the button listener, otherwise use the spacebar if current_platform.startswith("raspberry-pi"): - logger.info("Raspberry Pi detected, using button on GPIO pin 15") + print("Raspberry Pi detected, using button on GPIO pin 15") # Use GPIO pin 15 pindef = ["gpiochip4", "15"] # gpiofind PIN15 print("PINDEF", pindef) diff --git a/software/source/clients/windows/__init__.py b/software/source/clients/windows/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/software/source/clients/windows/device.py b/software/source/clients/windows/device.py new file mode 100644 index 0000000..a9a79c0 --- /dev/null +++ b/software/source/clients/windows/device.py @@ -0,0 +1,10 @@ +from ..base_device import Device + +device = Device() + +def main(server_url): + device.server_url = server_url + device.start() + +if __name__ == "__main__": + main() diff --git a/software/source/server/server.py b/software/source/server/server.py index f468634..a5ecbd0 100644 --- a/software/source/server/server.py +++ b/software/source/server/server.py @@ -16,7 +16,7 @@ from starlette.websockets import WebSocket, WebSocketDisconnect from pathlib import Path import asyncio import urllib.parse -from .utils.kernel import put_kernel_messages_into_queue +from .utils.kernel import KernelChecker, put_kernel_messages_into_queue from .i import configure_interpreter from interpreter import interpreter from ..utils.accumulator import Accumulator @@ -86,7 +86,7 @@ if os.getenv('CODE_RUNNER') == "device": to_device.put({"role": "assistant", "type": "code", "format": "python", "start": True}) to_device.put(message) to_device.put({"role": "assistant", "type": "code", "format": "python", "end": True}) - + # Stream the response logger.info("Waiting for the device to respond...") while True: @@ -165,13 +165,13 @@ async def receive_messages(websocket: WebSocket): return else: raise - + async def send_messages(websocket: WebSocket): while True: message = await to_device.get() #print(f"Sending to the device: {type(message)} {str(message)[:100]}") - + try: if isinstance(message, dict): await websocket.send_json(message) @@ -197,7 +197,7 @@ async def listener(): break await asyncio.sleep(1) - + message = accumulator.accumulate(chunk) if message == None: @@ -251,7 +251,7 @@ async def listener(): if any([m["type"] == "image" for m in messages]) and interpreter.llm.model.startswith("gpt-"): interpreter.llm.model = "gpt-4-vision-preview" interpreter.llm.supports_vision = True - + for chunk in interpreter.chat(messages, stream=True, display=True): if any([m["type"] == "image" for m in interpreter.messages]): @@ -263,18 +263,18 @@ async def listener(): await to_device.put(chunk) # Yield to the event loop, so you actually send it out await asyncio.sleep(0.01) - + if os.getenv('TTS_RUNNER') == "server": # Speak full sentences out loud if chunk["role"] == "assistant" and "content" in chunk and chunk["type"] == "message": accumulated_text += chunk["content"] sentences = split_into_sentences(accumulated_text) - + # If we're going to speak, say we're going to stop sending text. # This should be fixed probably, we should be able to do both in parallel, or only one. if any(is_full_sentence(sentence) for sentence in sentences): await to_device.put({"role": "assistant", "type": "message", "end": True}) - + if is_full_sentence(sentences[-1]): for sentence in sentences: await stream_tts_to_device(sentence) @@ -288,13 +288,13 @@ async def listener(): # This should be fixed probably, we should be able to do both in parallel, or only one. if any(is_full_sentence(sentence) for sentence in sentences): await to_device.put({"role": "assistant", "type": "message", "start": True}) - + # If we have a new message, save our progress and go back to the top if not from_user.empty(): # Check if it's just an end flag. We ignore those. temp_message = await from_user.get() - + if type(temp_message) is dict and temp_message.get("role") == "user" and temp_message.get("end"): # Yup. False alarm. continue @@ -311,7 +311,7 @@ async def listener(): # Also check if there's any new computer messages if not from_computer.empty(): - + with open(conversation_history_path, 'w') as file: json.dump(interpreter.messages, file, indent=4) @@ -333,7 +333,7 @@ async def stream_tts_to_device(sentence): await to_device.put(chunk) def stream_tts(sentence): - + audio_file = tts(sentence) with open(audio_file, "rb") as f: @@ -382,13 +382,13 @@ async def main(server_host, server_port, llm_service, model, llm_supports_vision services_directory = os.path.join(application_directory, 'services') service_dict = {'llm': llm_service, 'tts': tts_service, 'stt': stt_service} - + # Create a temp file with the session number session_file_path = os.path.join(user_data_dir('01'), '01-session.txt') with open(session_file_path, 'w') as session_file: session_id = int(datetime.datetime.now().timestamp() * 1000) session_file.write(str(session_id)) - + for service in service_dict: service_directory = os.path.join(services_directory, service, service_dict[service]) @@ -408,20 +408,21 @@ async def main(server_host, server_port, llm_service, model, llm_supports_vision }) module = import_module(f'.server.services.{service}.{service_dict[service]}.{service}', package='source') - + ServiceClass = getattr(module, service.capitalize()) service_instance = ServiceClass(config) globals()[service] = getattr(service_instance, service) interpreter.llm.completions = llm - + # Start listening asyncio.create_task(listener()) # Start watching the kernel if it's your job to do that if True: # in the future, code can run on device. for now, just server. - asyncio.create_task(put_kernel_messages_into_queue(from_computer)) - + kernel_checker = KernelChecker() + asyncio.create_task(put_kernel_messages_into_queue(kernel_checker, from_computer)) + config = Config(app, host=server_host, port=int(server_port), lifespan='on') server = Server(config) await server.serve() diff --git a/software/source/server/utils/kernel.py b/software/source/server/utils/kernel.py index b443bba..cdca961 100644 --- a/software/source/server/utils/kernel.py +++ b/software/source/server/utils/kernel.py @@ -1,63 +1,74 @@ -from dotenv import load_dotenv -load_dotenv() # take environment variables from .env. - import asyncio import subprocess import platform +from dotenv import load_dotenv +from .logs import setup_logging, logger -from .logs import setup_logging -from .logs import logger +load_dotenv() # take environment variables from .env. setup_logging() -def get_kernel_messages(): - """ - Is this the way to do this? - """ - current_platform = platform.system() - - if current_platform == "Darwin": - process = subprocess.Popen(['syslog'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) - output, _ = process.communicate() - return output.decode('utf-8') - elif current_platform == "Linux": - with open('/var/log/dmesg', 'r') as file: - return file.read() - else: - logger.info("Unsupported platform.") +class KernelChecker: + def __init__(self): + self._last_messages = "" + + def get_kernel_messages(self): + """ + Fetch system logs or kernel message from the operating system. -def custom_filter(message): - # Check for {TO_INTERPRETER{ message here }TO_INTERPRETER} pattern - if '{TO_INTERPRETER{' in message and '}TO_INTERPRETER}' in message: - start = message.find('{TO_INTERPRETER{') + len('{TO_INTERPRETER{') - end = message.find('}TO_INTERPRETER}', start) - return message[start:end] - # Check for USB mention - # elif 'USB' in message: - # return message - # # Check for network related keywords - # elif any(keyword in message for keyword in ['network', 'IP', 'internet', 'LAN', 'WAN', 'router', 'switch']) and "networkStatusForFlags" not in message: - - # return message - else: - return None - -last_messages = "" + - For MacOS, it uses syslog. + - For Linux, it uses dmesg. + - For Windows, it uses wevtutil with the 'qe' (query events) from the 'System' log + with the '/f:text' (format text) flag. + """ + current_platform = platform.system().lower() -def check_filtered_kernel(): - messages = get_kernel_messages() - messages.replace(last_messages, "") - messages = messages.split("\n") - - filtered_messages = [] - for message in messages: - if custom_filter(message): - filtered_messages.append(message) - - return "\n".join(filtered_messages) + if current_platform == "darwin": + process = subprocess.Popen(['syslog'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + output, _ = process.communicate() + return output.decode('utf-8') + elif current_platform == "linux": + with open('/var/log/dmesg', 'r') as file: + return file.read() + elif current_platform == "windows": + process = subprocess.Popen(['wevtutil', 'qe', 'System', '/f:text'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + output, _ = process.communicate() + try: + return output.decode('utf-8') + except UnicodeDecodeError: + try: + return output.decode('utf-16') + except UnicodeDecodeError: + return output.decode('cp1252') + else: + logger.info("Unsupported platform.") + return "" -async def put_kernel_messages_into_queue(queue): + def custom_filter(self, message): + if '{TO_INTERPRETER{' in message and '}TO_INTERPRETER}' in message: + start = message.find('{TO_INTERPRETER{') + len('{TO_INTERPRETER{') + end = message.find('}TO_INTERPRETER}', start) + return message[start:end] + else: + return None + + def check_filtered_kernel(self): + try: + messages = self.get_kernel_messages() + messages = messages.replace(self._last_messages, "") + messages = messages.split("\n") + + filtered_messages = [message for message in messages if self.custom_filter(message)] + + self._last_messages = "\n".join(filtered_messages) + return self._last_messages + except Exception as e: + logger.error(f"Error while checking kernel messages: {e}") + return None + + +async def put_kernel_messages_into_queue(kernel_checker, queue): while True: - text = check_filtered_kernel() + text = kernel_checker.check_filtered_kernel() if text: if isinstance(queue, asyncio.Queue): await queue.put({"role": "computer", "type": "console", "start": True}) @@ -67,5 +78,5 @@ async def put_kernel_messages_into_queue(queue): queue.put({"role": "computer", "type": "console", "start": True}) queue.put({"role": "computer", "type": "console", "format": "output", "content": text}) queue.put({"role": "computer", "type": "console", "end": True}) - - await asyncio.sleep(5) \ No newline at end of file + + await asyncio.sleep(5) diff --git a/software/start.py b/software/start.py index 70088e4..b7f96c3 100644 --- a/software/start.py +++ b/software/start.py @@ -15,32 +15,32 @@ app = typer.Typer() @app.command() def run( server: bool = typer.Option(False, "--server", help="Run server"), - server_host: str = typer.Option("0.0.0.0", "--server-host", help="Specify the server host where the server will deploy"), + server_host: str = typer.Option("127.0.0.1", "--server-host", help="Specify the server host where the server will deploy"), server_port: int = typer.Option(10001, "--server-port", help="Specify the server port where the server will deploy"), - + tunnel_service: str = typer.Option("ngrok", "--tunnel-service", help="Specify the tunnel service"), expose: bool = typer.Option(False, "--expose", help="Expose server to internet"), - + client: bool = typer.Option(False, "--client", help="Run client"), server_url: str = typer.Option(None, "--server-url", help="Specify the server URL that the client should expect. Defaults to server-host and server-port"), client_type: str = typer.Option("auto", "--client-type", help="Specify the client type"), - + llm_service: str = typer.Option("litellm", "--llm-service", help="Specify the LLM service"), - + model: str = typer.Option("gpt-4", "--model", help="Specify the model"), llm_supports_vision: bool = typer.Option(False, "--llm-supports-vision", help="Specify if the LLM service supports vision"), llm_supports_functions: bool = typer.Option(False, "--llm-supports-functions", help="Specify if the LLM service supports functions"), context_window: int = typer.Option(2048, "--context-window", help="Specify the context window size"), max_tokens: int = typer.Option(4096, "--max-tokens", help="Specify the maximum number of tokens"), temperature: float = typer.Option(0.8, "--temperature", help="Specify the temperature for generation"), - + tts_service: str = typer.Option("openai", "--tts-service", help="Specify the TTS service"), - + stt_service: str = typer.Option("openai", "--stt-service", help="Specify the STT service"), local: bool = typer.Option(False, "--local", help="Use recommended local services for LLM, STT, and TTS"), ): - + _run( server=server, server_host=server_host, @@ -66,39 +66,39 @@ def _run( server: bool = False, server_host: str = "0.0.0.0", server_port: int = 10001, - + tunnel_service: str = "bore", expose: bool = False, - + client: bool = False, server_url: str = None, client_type: str = "auto", - + llm_service: str = "litellm", - + model: str = "gpt-4", llm_supports_vision: bool = False, llm_supports_functions: bool = False, context_window: int = 2048, max_tokens: int = 4096, temperature: float = 0.8, - + tts_service: str = "openai", - + stt_service: str = "openai", local: bool = False ): - + if local: tts_service = "piper" # llm_service = "llamafile" stt_service = "local-whisper" select_local_model() - + if not server_url: server_url = f"{server_host}:{server_port}" - + if not server and not client: server = True client = True @@ -120,10 +120,12 @@ def _run( if client: if client_type == "auto": - system_type = platform.system() - if system_type == "Darwin": # Mac OS + system_type = platform.system().lower() + if system_type == "darwin": # Mac OS client_type = "mac" - elif system_type == "Linux": # Linux System + elif system_type == "windows": + client_type = "windows" + elif system_type == "linux": # Linux System try: with open('/proc/device-tree/model', 'r') as m: if 'raspberry pi' in m.read().lower(): @@ -145,4 +147,4 @@ def _run( if client: client_thread.join() except KeyboardInterrupt: - os.kill(os.getpid(), signal.SIGINT) \ No newline at end of file + os.kill(os.getpid(), signal.SIGINT)