@ -4,19 +4,22 @@ import json
import time
import time
import queue
import queue
import os
import os
import traceback
from queue import Queue
from queue import Queue
from threading import Thread
from threading import Thread
import threading
import uvicorn
import uvicorn
import re
import re
from fastapi import FastAPI
from fastapi import FastAPI
from threading import Thread
from threading import Thread
from starlette . websockets import WebSocket
from starlette . websockets import WebSocket
from stt import stt
from stt import stt _bytes
from tts import tts
from tts import tts
from pathlib import Path
from pathlib import Path
import asyncio
import asyncio
from i import configure_interpreter
import urllib . parse
import urllib . parse
from utils . put_kernel_messages_into_queue import put_kernel_messages_into_queue
from i import configure_interpreter
from interpreter import interpreter
from interpreter import interpreter
app = FastAPI ( )
app = FastAPI ( )
@ -30,10 +33,10 @@ def is_full_sentence(text):
def split_into_sentences ( text ) :
def split_into_sentences ( text ) :
return re . split ( r ' (?<=[.!?]) \ s+ ' , text )
return re . split ( r ' (?<=[.!?]) \ s+ ' , text )
# Global q ueues
# Q ueues
receive_queue = queue . Queue ( )
from_computer = queue . Queue ( ) # Just for computer messages from the device. Sync queue because interpreter.run is synchronous
send_queue = queue . Queue ( )
from_user = asyncio . Queue ( ) # Just for user messages from the device.
recieve_computer_queue = queue . Queue ( ) # Just for computer messages from the device
to_device = asyncio . Queue ( ) # For messages we send.
# Switch code executor to device if that's set
# Switch code executor to device if that's set
@ -56,14 +59,14 @@ if os.getenv('CODE_RUNNER') == "device":
# Unless it was just sent to the device, send it wrapped in flags
# Unless it was just sent to the device, send it wrapped in flags
if not ( interpreter . messages and interpreter . messages [ - 1 ] == message ) :
if not ( interpreter . messages and interpreter . messages [ - 1 ] == message ) :
send_queu e. put ( { " role " : " assistant " , " type " : " code " , " format " : " python " , " start " : True } )
to_devic e. put ( { " role " : " assistant " , " type " : " code " , " format " : " python " , " start " : True } )
send_queu e. put ( message )
to_devic e. put ( message )
send_queu e. put ( { " role " : " assistant " , " type " : " code " , " format " : " python " , " end " : True } )
to_devic e. put ( { " role " : " assistant " , " type " : " code " , " format " : " python " , " end " : True } )
# Stream the response
# Stream the response
print ( " Waiting for the device to respond... " )
print ( " Waiting for the device to respond... " )
while True :
while True :
chunk = recieve_computer_queue . get ( )
chunk = from_computer . get ( )
print ( " Server recieved from device: " , chunk )
print ( " Server recieved from device: " , chunk )
if " end " in chunk :
if " end " in chunk :
break
break
@ -87,47 +90,52 @@ async def websocket_endpoint(websocket: WebSocket):
await websocket . accept ( )
await websocket . accept ( )
receive_task = asyncio . create_task ( receive_messages ( websocket ) )
receive_task = asyncio . create_task ( receive_messages ( websocket ) )
send_task = asyncio . create_task ( send_messages ( websocket ) )
send_task = asyncio . create_task ( send_messages ( websocket ) )
try :
await asyncio . gather ( receive_task , send_task )
await asyncio . gather ( receive_task , send_task )
except Exception as e :
traceback . print_exc ( )
print ( f " Connection lost. Error: { e } " )
async def receive_messages ( websocket : WebSocket ) :
async def receive_messages ( websocket : WebSocket ) :
while True :
while True :
data = await websocket . receive_text ( )
data = await websocket . receive_json ( )
if type ( data ) == dict and data [ " role " ] == " computer " :
if data [ " role " ] == " computer " :
recieve_computer_queue . put ( data ) # To be handled by interpreter.computer.run
from_computer . put ( data ) # To be handled by interpreter.computer.run
elif data [ " role " ] == " user " :
await from_user . put ( data )
else :
else :
receive_queue . put ( data )
raise ( " Unknown role: " , data )
async def send_messages ( websocket : WebSocket ) :
async def send_messages ( websocket : WebSocket ) :
while True :
while True :
message = await asyncio. get_event_loop ( ) . run_in_executor ( None , send_queue . get )
message = await to_device. get ( )
print ( message )
print ( message )
await websocket . send_json ( message )
await websocket . send_json ( message )
def queue _listener( ) :
async def user _listener( ) :
audio_ file = bytearray ( )
audio_ bytes = bytearray ( )
while True :
while True :
# Check 10x a second for new messages
message = await from_user . get ( )
while receive_queue . empty ( ) :
time . sleep ( 0.1 )
message = receive_queue . get ( )
message = json . loads ( message )
# Hold the audio in a buffer. If it's ready (we got end flag, stt it)
# Hold the audio in a buffer. If it's ready (we got end flag, stt it)
if message [ " type " ] == " audio " :
if message [ " type " ] == " audio " :
if " content " in message :
if " content " in message :
audio_ file . extend ( bytes ( ast . literal_eval ( message [ " content " ] ) ) )
audio_bytes . extend ( bytes ( ast . literal_eval ( message [ " content " ] ) ) )
if " end " in message :
if " end " in message :
content = stt ( audio_file , message [ " format " ] )
content = stt _bytes( audio_bytes , message [ " format " ] )
if content == None : # If it was nothing / silence
if content == None : # If it was nothing / silence
continue
continue
audio_ file = bytearray ( )
audio_ bytes = bytearray ( )
message = { " role " : " user " , " type " : " message " , " content " : content }
message = { " role " : " user " , " type " : " message " , " content " : content }
else :
else :
continue
continue
# Ignore flags, we only needed them for audio ^
if " content " not in message :
continue
# Custom stop message will halt us
# Custom stop message will halt us
if message . get ( " content " ) and message . get ( " content " ) . lower ( ) . strip ( " .,! " ) == " stop " :
if message [ " content " ] . lower ( ) . strip ( " .,! " ) == " stop " :
continue
continue
# Load, append, and save conversation history
# Load, append, and save conversation history
@ -142,53 +150,59 @@ def queue_listener():
for chunk in interpreter . chat ( messages , stream = True ) :
for chunk in interpreter . chat ( messages , stream = True ) :
# Send it to the user
# Send it to the user
send_queu e. put ( chunk )
await to_devic e. put ( chunk )
# Speak full sentences out loud
# Speak full sentences out loud
if chunk [ " role " ] == " assistant " and " content " in chunk :
if chunk [ " role " ] == " assistant " and " content " in chunk :
print ( " Chunk role is assistant and content is present in chunk. " )
accumulated_text + = chunk [ " content " ]
accumulated_text + = chunk [ " content " ]
print ( " Accumulated text: " , accumulated_text )
sentences = split_into_sentences ( accumulated_text )
sentences = split_into_sentences ( accumulated_text )
print ( " Sentences after splitting: " , sentences )
if is_full_sentence ( sentences [ - 1 ] ) :
if is_full_sentence ( sentences [ - 1 ] ) :
print ( " Last sentence is a full sentence. " )
for sentence in sentences :
for sentence in sentences :
print ( " Streaming sentence: " , sentence )
await stream_or_play_tts ( sentence )
stream_tts_to_user ( sentence )
accumulated_text = " "
accumulated_text = " "
print ( " Reset accumulated text. " )
else :
else :
print ( " Last sentence is not a full sentence. " )
for sentence in sentences [ : - 1 ] :
for sentence in sentences [ : - 1 ] :
print ( " Streaming sentence: " , sentence )
await stream_or_play_tts ( sentence )
stream_tts_to_user ( sentence )
accumulated_text = sentences [ - 1 ]
accumulated_text = sentences [ - 1 ]
print ( " Accumulated text is now the last sentence: " , accumulated_text )
# If we have a new message, save our progress and go back to the top
# If we have a new message, save our progress and go back to the top
if not receive_queue . empty ( ) :
if not from_user . empty ( ) :
with open ( conversation_history_path , ' w ' ) as file :
with open ( conversation_history_path , ' w ' ) as file :
json . dump ( interpreter . messages , file )
json . dump ( interpreter . messages , file )
break
break
def stream_tts_to_user ( sentence ) :
async def stream_or_play_tts ( sentence ) :
send_queue . put ( { " role " : " assistant " , " type " : " audio " , " format " : " audio/mp3 " , " start " : True } )
audio_bytes = tts ( sentence )
if os . getenv ( ' TTS_RUNNER ' ) == " server " :
send_queue . put ( { " role " : " assistant " , " type " : " audio " , " format " : " audio/mp3 " , " content " : str ( audio_bytes ) } )
tts ( sentence , play_audio = True )
send_queue . put ( { " role " : " assistant " , " type " : " audio " , " format " : " audio/mp3 " , " end " : True } )
else :
await to_device . put ( { " role " : " assistant " , " type " : " audio " , " format " : " audio/mp3 " , " start " : True } )
audio_bytes = tts ( sentence , play_audio = False )
await to_device . put ( { " role " : " assistant " , " type " : " audio " , " format " : " audio/mp3 " , " content " : str ( audio_bytes ) } )
await to_device . put ( { " role " : " assistant " , " type " : " audio " , " format " : " audio/mp3 " , " end " : True } )
# Create a thread for the queue listener
queue_thread = Thread ( target = queue_listener )
# Start the queue listener thread
from uvicorn import Config , Server
queue_thread . start ( )
# Run the FastAPI app
# Run the FastAPI app
if __name__ == " __main__ " :
if __name__ == " __main__ " :
async def main ( ) :
# Start listening to the user
asyncio . create_task ( user_listener ( ) )
# Start watching the kernel if it's your job to do that
if os . getenv ( ' CODE_RUNNER ' ) == " server " :
asyncio . create_task ( put_kernel_messages_into_queue ( from_computer ) )
server_url = os . getenv ( ' SERVER_URL ' )
server_url = os . getenv ( ' SERVER_URL ' )
if not server_url :
if not server_url :
raise ValueError ( " The environment variable SERVER_URL is not set. Please set it to proceed. " )
raise ValueError ( " The environment variable SERVER_URL is not set. Please set it to proceed. " )
parsed_url = urllib . parse . urlparse ( server_url )
parsed_url = urllib . parse . urlparse ( server_url )
print ( " Starting `server.py`... " )
print ( " Starting `server.py`... " )
uvicorn . run ( app , host = parsed_url . hostname , port = parsed_url . port )
config = Config ( app , host = parsed_url . hostname , port = parsed_url . port , lifespan = ' on ' )
server = Server ( config )
await server . serve ( )
asyncio . run ( main ( ) )