diff --git a/software/source/clients/base_device.py b/software/source/clients/base_device.py index 767f4b0..a53e915 100644 --- a/software/source/clients/base_device.py +++ b/software/source/clients/base_device.py @@ -92,7 +92,8 @@ class Device: self.audiosegments = asyncio.Queue() self.server_url = "" self.ctrl_pressed = False - self.playback_latency = None + self.tts_service = "" + # self.playback_latency = None def fetch_image_from_camera(self, camera_index=CAMERA_DEVICE_INDEX): """Captures an image from the specified camera device and saves it to a temporary file. Adds the image to the captured_images list.""" @@ -166,29 +167,18 @@ class Device: while True: try: audio = await self.audiosegments.get() - # print("got audio segment!!!!") - if self.playback_latency: - elapsed_time = time.time() - self.playback_latency - print(f"Time from request to playback: {elapsed_time} seconds") - self.playback_latency = None - """ - if audio is not None: + # if self.playback_latency and isinstance(audio, bytes): + # elapsed_time = time.time() - self.playback_latency + # print(f"Time from request to playback: {elapsed_time} seconds") + # self.playback_latency = None + + if self.tts_service == "elevenlabs": mpv_process.stdin.write(audio) # type: ignore mpv_process.stdin.flush() # type: ignore + else: + play(audio) - args = ["ffplay", "-autoexit", "-", "-nodisp"] - proc = subprocess.Popen( - args=args, - stdout=subprocess.PIPE, - stdin=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - out, err = proc.communicate(input=audio) - proc.poll() - """ - play(audio) - # self.audiosegments.remove(audio) - # await asyncio.sleep(0.1) + await asyncio.sleep(0.1) except asyncio.exceptions.CancelledError: # This happens once at the start? pass @@ -236,7 +226,7 @@ class Device: stream.stop_stream() stream.close() print("Recording stopped.") - self.playback_latency = time.time() + # self.playback_latency = time.time() duration = wav_file.getnframes() / RATE if duration < 0.3: @@ -343,26 +333,19 @@ class Device: async def message_sender(self, websocket): while True: - try: - message = await asyncio.get_event_loop().run_in_executor( - None, send_queue.get - ) - - if isinstance(message, bytes): - await websocket.send(message) - - else: - await websocket.send(json.dumps(message)) - - send_queue.task_done() - await asyncio.sleep(0.01) - except: - traceback.print_exc() + message = await asyncio.get_event_loop().run_in_executor( + None, send_queue.get + ) + if isinstance(message, bytes): + await websocket.send(message) + else: + await websocket.send(json.dumps(message)) + send_queue.task_done() + await asyncio.sleep(0.01) async def websocket_communication(self, WS_URL): - print("websocket communication was called!!!!") show_connection_log = True - """ + async def exec_ws_communication(websocket): if CAMERA_ENABLED: print( @@ -374,46 +357,46 @@ class Device: asyncio.create_task(self.message_sender(websocket)) while True: - await asyncio.sleep(0) + await asyncio.sleep(0.01) chunk = await websocket.recv() - #logger.debug(f"Got this message from the server: {type(chunk)} {chunk}") - print((f"Got this message from the server: {type(chunk)}")) + logger.debug(f"Got this message from the server: {type(chunk)} {chunk}") + # print("received chunk from server") + if type(chunk) == str: chunk = json.loads(chunk) - # message = accumulator.accumulate(chunk) - message = chunk + if self.tts_service == "elevenlabs": + message = chunk + else: + message = accumulator.accumulate(chunk) + if message == None: # Will be None until we have a full message ready continue # At this point, we have our message - print("checkpoint reached!") - if isinstance(message, bytes): - - # if message["type"] == "audio" and message["format"].startswith("bytes"): + if isinstance(message, bytes) or ( + message["type"] == "audio" and message["format"].startswith("bytes") + ): # Convert bytes to audio file - - # audio_bytes = message["content"] - audio_bytes = message - - # Create an AudioSegment instance with the raw data - - audio = AudioSegment( - # raw audio data (bytes) - data=audio_bytes, - # signed 16-bit little-endian format - sample_width=2, - # 24,000 Hz frame rate - frame_rate=24000, - # mono sound - channels=1, - ) - - - print("audio segment was created") - #await self.audiosegments.put(audio_bytes) + if self.tts_service == "elevenlabs": + audio_bytes = message + audio = audio_bytes + else: + audio_bytes = message["content"] + + # Create an AudioSegment instance with the raw data + audio = AudioSegment( + # raw audio data (bytes) + data=audio_bytes, + # signed 16-bit little-endian format + sample_width=2, + # 16,000 Hz frame rate + frame_rate=22050, + # mono sound + channels=1, + ) await self.audiosegments.put(audio) @@ -425,7 +408,6 @@ class Device: result = interpreter.computer.run(language, code) send_queue.put(result) - """ if is_win10(): logger.info("Windows 10 detected") # Workaround for Windows 10 not latching to the websocket server. @@ -436,54 +418,29 @@ class Device: except Exception as e: logger.error(f"Error while attempting to connect: {e}") else: - print("websocket url is", WS_URL) - i = 0 - # while True: - # try: - # i += 1 - # print("i is", i) - - # # Hit the /ping endpoint - # ping_url = f"http://{self.server_url}/ping" - # response = requests.get(ping_url) - # print(response.text) - # # async with aiohttp.ClientSession() as session: - # # async with session.get(ping_url) as response: - # # print(f"Ping response: {await response.text()}") - - for i in range(3): - print(i) + while True: try: async with websockets.connect(WS_URL) as websocket: - print("happi happi happi :DDDDDDDDDDDDD") - # await exec_ws_communication(websocket) - # print("exiting exec_ws_communication") + await exec_ws_communication(websocket) except: - print("exception in websocket communication!!!!!!!!!!!!!!!!!") - traceback.print_exc() - - # except: - # print("exception in websocket communication!!!!!!!!!!!!!!!!!") - # traceback.print_exc() - # if show_connection_log: - # logger.info(f"Connecting to `{WS_URL}`...") - # show_connection_log = False - # await asyncio.sleep(2) + logger.debug(traceback.format_exc()) + if show_connection_log: + logger.info(f"Connecting to `{WS_URL}`...") + show_connection_log = False + await asyncio.sleep(2) async def start_async(self): - print("start async was called!!!!!") # Configuration for WebSocket - WS_URL = f"ws://{self.server_url}/" - + WS_URL = f"ws://{self.server_url}" # Start the WebSocket communication await self.websocket_communication(WS_URL) - """ # Start watching the kernel if it's your job to do that if os.getenv("CODE_RUNNER") == "client": + # client is not running code! asyncio.create_task(put_kernel_messages_into_queue(send_queue)) - #asyncio.create_task(self.play_audiosegments()) + asyncio.create_task(self.play_audiosegments()) # If Raspberry Pi, add the button listener, otherwise use the spacebar if current_platform.startswith("raspberry-pi"): @@ -507,14 +464,13 @@ class Device: else: break else: - """ - # Keyboard listener for spacebar press/release - # listener = keyboard.Listener(on_press=self.on_press, on_release=self.on_release) - # listener.start() - # print("listener for keyboard started!!!!!") + # Keyboard listener for spacebar press/release + listener = keyboard.Listener( + on_press=self.on_press, on_release=self.on_release + ) + listener.start() def start(self): - print("device was started!!!!!!") if os.getenv("TEACH_MODE") != "True": asyncio.run(self.start_async()) p.terminate() diff --git a/software/source/clients/linux/device.py b/software/source/clients/linux/device.py index 0fa0fed..cf549d5 100644 --- a/software/source/clients/linux/device.py +++ b/software/source/clients/linux/device.py @@ -3,8 +3,9 @@ from ..base_device import Device device = Device() -def main(server_url): +def main(server_url, tts_service): device.server_url = server_url + device.tts_service = tts_service device.start() diff --git a/software/source/clients/mac/device.py b/software/source/clients/mac/device.py index 0fa0fed..cf549d5 100644 --- a/software/source/clients/mac/device.py +++ b/software/source/clients/mac/device.py @@ -3,8 +3,9 @@ from ..base_device import Device device = Device() -def main(server_url): +def main(server_url, tts_service): device.server_url = server_url + device.tts_service = tts_service device.start() diff --git a/software/source/clients/windows/device.py b/software/source/clients/windows/device.py index 0fa0fed..cf549d5 100644 --- a/software/source/clients/windows/device.py +++ b/software/source/clients/windows/device.py @@ -3,8 +3,9 @@ from ..base_device import Device device = Device() -def main(server_url): +def main(server_url, tts_service): device.server_url = server_url + device.tts_service = tts_service device.start() diff --git a/software/source/server/async_interpreter.py b/software/source/server/async_interpreter.py index 1e6473a..a2d78bf 100644 --- a/software/source/server/async_interpreter.py +++ b/software/source/server/async_interpreter.py @@ -10,16 +10,9 @@ """ ### - from pynput import keyboard -from RealtimeTTS import ( - TextToAudioStream, - OpenAIEngine, - CoquiEngine, - ElevenlabsEngine, - SystemEngine, - GTTSEngine, -) + +from RealtimeTTS import TextToAudioStream, CoquiEngine, OpenAIEngine, ElevenlabsEngine from RealtimeSTT import AudioToTextRecorder import time import asyncio @@ -29,9 +22,9 @@ import os class AsyncInterpreter: def __init__(self, interpreter): - self.stt_latency = None - self.tts_latency = None - self.interpreter_latency = None + # self.stt_latency = None + # self.tts_latency = None + # self.interpreter_latency = None self.interpreter = interpreter # STT @@ -45,12 +38,9 @@ class AsyncInterpreter: engine = CoquiEngine() elif self.interpreter.tts == "openai": engine = OpenAIEngine() - elif self.interpreter.tts == "gtts": - engine = GTTSEngine() elif self.interpreter.tts == "elevenlabs": engine = ElevenlabsEngine(api_key=os.environ["ELEVEN_LABS_API_KEY"]) - elif self.interpreter.tts == "system": - engine = SystemEngine() + engine.set_voice("Michael") else: raise ValueError(f"Unsupported TTS engine: {self.interpreter.tts}") self.tts = TextToAudioStream(engine) @@ -112,103 +102,96 @@ class AsyncInterpreter: # print("ADDING TO QUEUE:", chunk) asyncio.create_task(self._add_to_queue(self._output_queue, chunk)) - async def run(self): - """ - Runs OI on the audio bytes submitted to the input. Will add streaming LMC chunks to the _output_queue. - """ + def generate(self, message, start_interpreter): + last_lmc_start_flag = self._last_lmc_start_flag self.interpreter.messages = self.active_chat_messages - self.stt.stop() - # message = self.stt.text() - # print("THE MESSAGE:", message) + # print("message is", message) - # accumulates the input queue message - input_queue = [] - while not self._input_queue.empty(): - input_queue.append(self._input_queue.get()) + for chunk in self.interpreter.chat(message, display=True, stream=True): - # print("INPUT QUEUE:", input_queue) - # message = [i for i in input_queue if i["type"] == "message"][0]["content"] - start_stt = time.time() - message = self.stt.text() - end_stt = time.time() - self.stt_latency = end_stt - start_stt - print("STT LATENCY", self.stt_latency) + if self._last_lmc_start_flag != last_lmc_start_flag: + # self.beeper.stop() + break - def generate(message): - last_lmc_start_flag = self._last_lmc_start_flag - self.interpreter.messages = self.active_chat_messages - print("message is", message) + # self.add_to_output_queue_sync(chunk) # To send text, not just audio - for chunk in self.interpreter.chat(message, display=True, stream=True): + content = chunk.get("content") - if self._last_lmc_start_flag != last_lmc_start_flag: + # Handle message blocks + if chunk.get("type") == "message": + if content: # self.beeper.stop() - break - # self.add_to_output_queue_sync(chunk) # To send text, not just audio + # Experimental: The AI voice sounds better with replacements like these, but it should happen at the TTS layer + # content = content.replace(". ", ". ... ").replace(", ", ", ... ").replace("!", "! ... ").replace("?", "? ... ") + # print("yielding ", content) + yield content + + # Handle code blocks + elif chunk.get("type") == "code": + if "start" in chunk: + # self.beeper.start() + pass + + # Experimental: If the AI wants to type, we should type immediatly + if ( + self.interpreter.messages[-1] + .get("content", "") + .startswith("computer.keyboard.write(") + ): + keyboard.controller.type(content) + self._in_keyboard_write_block = True + if "end" in chunk and self._in_keyboard_write_block: + self._in_keyboard_write_block = False + # (This will make it so it doesn't type twice when the block executes) + if self.interpreter.messages[-1]["content"].startswith( + "computer.keyboard.write(" + ): + self.interpreter.messages[-1]["content"] = ( + "dummy_variable = (" + + self.interpreter.messages[-1]["content"][ + len("computer.keyboard.write(") : + ] + ) + + # Send a completion signal + # end_interpreter = time.time() + # self.interpreter_latency = end_interpreter - start_interpreter + # print("INTERPRETER LATENCY", self.interpreter_latency) + # self.add_to_output_queue_sync({"role": "server","type": "completion", "content": "DONE"}) - content = chunk.get("content") + async def run(self): + """ + Runs OI on the audio bytes submitted to the input. Will add streaming LMC chunks to the _output_queue. + """ + self.interpreter.messages = self.active_chat_messages - # Handle message blocks - if chunk.get("type") == "message": - if content: - # self.beeper.stop() + self.stt.stop() - # Experimental: The AI voice sounds better with replacements like these, but it should happen at the TTS layer - # content = content.replace(". ", ". ... ").replace(", ", ", ... ").replace("!", "! ... ").replace("?", "? ... ") - # print("yielding this", content) - yield content + input_queue = [] + while not self._input_queue.empty(): + input_queue.append(self._input_queue.get()) - # Handle code blocks - elif chunk.get("type") == "code": - if "start" in chunk: - # self.beeper.start() - pass + # start_stt = time.time() + message = self.stt.text() + # end_stt = time.time() + # self.stt_latency = end_stt - start_stt + # print("STT LATENCY", self.stt_latency) - # Experimental: If the AI wants to type, we should type immediatly - if ( - self.interpreter.messages[-1] - .get("content", "") - .startswith("computer.keyboard.write(") - ): - keyboard.controller.type(content) - self._in_keyboard_write_block = True - if "end" in chunk and self._in_keyboard_write_block: - self._in_keyboard_write_block = False - # (This will make it so it doesn't type twice when the block executes) - if self.interpreter.messages[-1]["content"].startswith( - "computer.keyboard.write(" - ): - self.interpreter.messages[-1]["content"] = ( - "dummy_variable = (" - + self.interpreter.messages[-1]["content"][ - len("computer.keyboard.write(") : - ] - ) - - # Send a completion signal - end_interpreter = time.time() - self.interpreter_latency = end_interpreter - start_interpreter - print("INTERPRETER LATENCY", self.interpreter_latency) - # self.add_to_output_queue_sync({"role": "server","type": "completion", "content": "DONE"}) + # print(message) # Feed generate to RealtimeTTS self.add_to_output_queue_sync( {"role": "assistant", "type": "audio", "format": "bytes.wav", "start": True} ) start_interpreter = time.time() - text_iterator = generate(message) + text_iterator = self.generate(message, start_interpreter) self.tts.feed(text_iterator) - self.tts.play_async(on_audio_chunk=self.on_tts_chunk, muted=False) - while True: - if self.tts.is_playing(): - start_tts = time.time() + self.tts.play_async(on_audio_chunk=self.on_tts_chunk, muted=True) - break - await asyncio.sleep(0.1) while True: await asyncio.sleep(0.1) # print("is_playing", self.tts.is_playing()) @@ -221,14 +204,14 @@ class AsyncInterpreter: "end": True, } ) - end_tts = time.time() - self.tts_latency = end_tts - start_tts - print("TTS LATENCY", self.tts_latency) + # end_tts = time.time() + # self.tts_latency = end_tts - self.tts.stream_start_time + # print("TTS LATENCY", self.tts_latency) self.tts.stop() break async def _on_tts_chunk_async(self, chunk): - print(f"Adding chunk to output queue") + # print("adding chunk to queue") await self._add_to_queue(self._output_queue, chunk) def on_tts_chunk(self, chunk): @@ -236,7 +219,5 @@ class AsyncInterpreter: asyncio.run(self._on_tts_chunk_async(chunk)) async def output(self): - print("entering output method") - value = await self._output_queue.get() - print("output method returning") - return value + # print("outputting chunks") + return await self._output_queue.get() diff --git a/software/source/server/async_server.py b/software/source/server/async_server.py index f4fcc3e..b88fa57 100644 --- a/software/source/server/async_server.py +++ b/software/source/server/async_server.py @@ -1,37 +1,39 @@ import asyncio import traceback import json -from fastapi import FastAPI, WebSocket, Header +from fastapi import FastAPI, WebSocket from fastapi.responses import PlainTextResponse from uvicorn import Config, Server +from .i import configure_interpreter from interpreter import interpreter as base_interpreter from starlette.websockets import WebSocketDisconnect from .async_interpreter import AsyncInterpreter from fastapi.middleware.cors import CORSMiddleware from typing import List, Dict, Any -from openai import OpenAI -from pydantic import BaseModel -import argparse import os -# import sentry_sdk - -base_interpreter.system_message = ( - "You are a helpful assistant that can answer questions and help with tasks." -) -base_interpreter.computer.import_computer_api = False -base_interpreter.llm.model = "groq/llama3-8b-8192" -base_interpreter.llm.api_key = os.environ["GROQ_API_KEY"] -base_interpreter.llm.supports_functions = False -base_interpreter.auto_run = True -base_interpreter.tts = "elevenlabs" os.environ["STT_RUNNER"] = "server" os.environ["TTS_RUNNER"] = "server" -async def main(server_host, server_port): - interpreter = AsyncInterpreter(base_interpreter) +async def main(server_host, server_port, tts_service, asynchronous): + if asynchronous: + base_interpreter.system_message = ( + "You are a helpful assistant that can answer questions and help with tasks." + ) + base_interpreter.computer.import_computer_api = False + base_interpreter.llm.model = "groq/llama3-8b-8192" + base_interpreter.llm.api_key = os.environ["GROQ_API_KEY"] + base_interpreter.llm.supports_functions = False + base_interpreter.auto_run = True + base_interpreter.tts = tts_service + interpreter = AsyncInterpreter(base_interpreter) + else: + configured_interpreter = configure_interpreter(base_interpreter) + configured_interpreter.llm.supports_functions = True + configured_interpreter.tts = tts_service + interpreter = AsyncInterpreter(configured_interpreter) app = FastAPI() @@ -93,7 +95,7 @@ async def main(server_host, server_port): async def receive_input(): try: while True: - print("server awaiting input") + # print("server awaiting input") data = await websocket.receive() if isinstance(data, bytes): diff --git a/software/start.py b/software/start.py index 7c5186e..6db7ebb 100644 --- a/software/start.py +++ b/software/start.py @@ -6,6 +6,8 @@ import os import importlib from source.server.tunnel import create_tunnel from source.server.async_server import main + +# from source.server.server import main from source.server.utils.local_mode import select_local_model import signal @@ -63,7 +65,7 @@ def run( 0.8, "--temperature", help="Specify the temperature for generation" ), tts_service: str = typer.Option( - "openai", "--tts-service", help="Specify the TTS service" + "elevenlabs", "--tts-service", help="Specify the TTS service" ), stt_service: str = typer.Option( "openai", "--stt-service", help="Specify the STT service" @@ -75,6 +77,9 @@ def run( mobile: bool = typer.Option( False, "--mobile", help="Toggle server to support mobile app" ), + asynchronous: bool = typer.Option( + False, "--async", help="use interpreter optimized for latency" + ), ): _run( server=server or mobile, @@ -97,6 +102,7 @@ def run( local=local, qr=qr or mobile, mobile=mobile, + asynchronous=asynchronous, ) @@ -116,14 +122,15 @@ def _run( context_window: int = 2048, max_tokens: int = 4096, temperature: float = 0.8, - tts_service: str = "openai", + tts_service: str = "elevenlabs", stt_service: str = "openai", local: bool = False, qr: bool = False, mobile: bool = False, + asynchronous: bool = False, ): if local: - tts_service = "piper" + tts_service = "coqui" # llm_service = "llamafile" stt_service = "local-whisper" select_local_model() @@ -154,6 +161,8 @@ def _run( main( server_host, server_port, + tts_service, + asynchronous, # llm_service, # model, # llm_supports_vision, @@ -161,7 +170,6 @@ def _run( # context_window, # max_tokens, # temperature, - # tts_service, # stt_service, # mobile, ), @@ -180,7 +188,6 @@ def _run( system_type = platform.system() if system_type == "Darwin": # Mac OS client_type = "mac" - print("initiating mac device with base device!!!") elif system_type == "Windows": # Windows System client_type = "windows" elif system_type == "Linux": # Linux System @@ -196,9 +203,10 @@ def _run( module = importlib.import_module( f".clients.{client_type}.device", package="source" ) - # server_url = "0.0.0.0:8000" - client_thread = threading.Thread(target=module.main, args=[server_url]) - print("client thread started") + + client_thread = threading.Thread( + target=module.main, args=[server_url, tts_service] + ) client_thread.start() try: