You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
01/software/source/server/archive_async_interpreter.py

253 lines
8.9 KiB

# This is a websocket interpreter, TTS and STT disabled.
# It makes a websocket on port 8000 that sends/recieves LMC messages in *streaming* format.
### You MUST send a start and end flag with each message! For example: ###
"""
{"role": "user", "type": "message", "start": True})
{"role": "user", "type": "message", "content": "hi"})
{"role": "user", "type": "message", "end": True})
"""
###
from pynput import keyboard
from .utils.bytes_to_wav import bytes_to_wav
from RealtimeTTS import TextToAudioStream, CoquiEngine, OpenAIEngine, ElevenlabsEngine
from RealtimeSTT import AudioToTextRecorder
import time
import asyncio
import json
import os
class AsyncInterpreter:
def __init__(self, interpreter, debug):
self.stt_latency = None
self.tts_latency = None
self.interpreter_latency = None
# time from first put to first yield
self.tffytfp = None
self.debug = debug
self.interpreter = interpreter
self.audio_chunks = []
# STT
self.stt = AudioToTextRecorder(
model="tiny.en", spinner=False, use_microphone=False
)
self.stt.stop() # It needs this for some reason
# TTS
if self.interpreter.tts == "coqui":
engine = CoquiEngine()
elif self.interpreter.tts == "openai":
engine = OpenAIEngine()
elif self.interpreter.tts == "elevenlabs":
engine = ElevenlabsEngine(api_key=os.environ["ELEVEN_LABS_API_KEY"])
engine.set_voice("Michael")
else:
raise ValueError(f"Unsupported TTS engine: {self.interpreter.tts}")
self.tts = TextToAudioStream(engine)
self.active_chat_messages = []
self._input_queue = asyncio.Queue() # Queue that .input will shove things into
self._output_queue = asyncio.Queue() # Queue to put output chunks into
self._last_lmc_start_flag = None # Unix time of last LMC start flag recieved
self._in_keyboard_write_block = (
False # Tracks whether interpreter is trying to use the keyboard
)
self.loop = asyncio.get_event_loop()
async def _add_to_queue(self, queue, item):
await queue.put(item)
async def clear_queue(self, queue):
while not queue.empty():
await queue.get()
async def clear_input_queue(self):
await self.clear_queue(self._input_queue)
async def clear_output_queue(self):
await self.clear_queue(self._output_queue)
async def input(self, chunk):
"""
Expects a chunk in streaming LMC format.
"""
if isinstance(chunk, bytes):
# It's probably a chunk of audio
self.stt.feed_audio(chunk)
self.audio_chunks.append(chunk)
# print("INTERPRETER FEEDING AUDIO")
else:
try:
chunk = json.loads(chunk)
except:
pass
if "start" in chunk:
# print("Starting STT")
self.stt.start()
self._last_lmc_start_flag = time.time()
# self.interpreter.computer.terminal.stop() # Stop any code execution... maybe we should make interpreter.stop()?
elif "end" in chunk:
# print("Running OI on input")
asyncio.create_task(self.run())
else:
await self._add_to_queue(self._input_queue, chunk)
def add_to_output_queue_sync(self, chunk):
"""
Synchronous function to add a chunk to the output queue.
"""
# print("ADDING TO QUEUE:", chunk)
asyncio.create_task(self._add_to_queue(self._output_queue, chunk))
def generate(self, message, start_interpreter):
last_lmc_start_flag = self._last_lmc_start_flag
self.interpreter.messages = self.active_chat_messages
# print("message is", message)
for chunk in self.interpreter.chat(message, display=True, stream=True):
if self._last_lmc_start_flag != last_lmc_start_flag:
# self.beeper.stop()
break
# self.add_to_output_queue_sync(chunk) # To send text, not just audio
content = chunk.get("content")
# Handle message blocks
if chunk.get("type") == "message":
if content:
# self.beeper.stop()
# Experimental: The AI voice sounds better with replacements like these, but it should happen at the TTS layer
# content = content.replace(". ", ". ... ").replace(", ", ", ... ").replace("!", "! ... ").replace("?", "? ... ")
# print("yielding ", content)
if self.tffytfp is None:
self.tffytfp = time.time()
yield content
# Handle code blocks
elif chunk.get("type") == "code":
if "start" in chunk:
# self.beeper.start()
pass
# Experimental: If the AI wants to type, we should type immediatly
if (
self.interpreter.messages[-1]
.get("content", "")
.startswith("computer.keyboard.write(")
):
keyboard.controller.type(content)
self._in_keyboard_write_block = True
if "end" in chunk and self._in_keyboard_write_block:
self._in_keyboard_write_block = False
# (This will make it so it doesn't type twice when the block executes)
if self.interpreter.messages[-1]["content"].startswith(
"computer.keyboard.write("
):
self.interpreter.messages[-1]["content"] = (
"dummy_variable = ("
+ self.interpreter.messages[-1]["content"][
len("computer.keyboard.write(") :
]
)
# Send a completion signal
if self.debug:
end_interpreter = time.time()
self.interpreter_latency = end_interpreter - start_interpreter
print("INTERPRETER LATENCY", self.interpreter_latency)
# self.add_to_output_queue_sync({"role": "server","type": "completion", "content": "DONE"})
async def run(self):
"""
Runs OI on the audio bytes submitted to the input. Will add streaming LMC chunks to the _output_queue.
"""
self.interpreter.messages = self.active_chat_messages
self.stt.stop()
input_queue = []
while not self._input_queue.empty():
input_queue.append(self._input_queue.get())
if self.debug:
start_stt = time.time()
message = self.stt.text()
end_stt = time.time()
self.stt_latency = end_stt - start_stt
print("STT LATENCY", self.stt_latency)
if self.audio_chunks:
audio_bytes = bytearray(b"".join(self.audio_chunks))
wav_file_path = bytes_to_wav(audio_bytes, "audio/raw")
print("wav_file_path ", wav_file_path)
self.audio_chunks = []
else:
message = self.stt.text()
print(message)
# Feed generate to RealtimeTTS
self.add_to_output_queue_sync(
{"role": "assistant", "type": "audio", "format": "bytes.wav", "start": True}
)
start_interpreter = time.time()
text_iterator = self.generate(message, start_interpreter)
self.tts.feed(text_iterator)
if not self.tts.is_playing():
self.tts.play_async(on_audio_chunk=self.on_tts_chunk, muted=True)
while True:
await asyncio.sleep(0.1)
# print("is_playing", self.tts.is_playing())
if not self.tts.is_playing():
self.add_to_output_queue_sync(
{
"role": "assistant",
"type": "audio",
"format": "bytes.wav",
"end": True,
}
)
if self.debug:
end_tts = time.time()
self.tts_latency = end_tts - self.tts.stream_start_time
print("TTS LATENCY", self.tts_latency)
self.tts.stop()
break
async def _on_tts_chunk_async(self, chunk):
# print("adding chunk to queue")
if self.debug and self.tffytfp is not None and self.tffytfp != 0:
print(
"time from first yield to first put is ",
time.time() - self.tffytfp,
)
self.tffytfp = 0
await self._add_to_queue(self._output_queue, chunk)
def on_tts_chunk(self, chunk):
# print("ye")
asyncio.run(self._on_tts_chunk_async(chunk))
async def output(self):
# print("outputting chunks")
return await self._output_queue.get()