@ -1,9 +1,9 @@
from dotenv import load_dotenv
from dotenv import load_dotenv
load_dotenv ( ) # take environment variables from .env.
load_dotenv ( ) # take environment variables from .env.
import traceback
import traceback
from platformdirs import user_data_dir
from platformdirs import user_data_dir
import ast
import json
import json
import queue
import queue
import os
import os
@ -13,9 +13,7 @@ import re
from fastapi import FastAPI , Request
from fastapi import FastAPI , Request
from fastapi . responses import PlainTextResponse
from fastapi . responses import PlainTextResponse
from starlette . websockets import WebSocket , WebSocketDisconnect
from starlette . websockets import WebSocket , WebSocketDisconnect
from pathlib import Path
import asyncio
import asyncio
import urllib . parse
from . utils . kernel import put_kernel_messages_into_queue
from . utils . kernel import put_kernel_messages_into_queue
from . i import configure_interpreter
from . i import configure_interpreter
from interpreter import interpreter
from interpreter import interpreter
@ -44,28 +42,31 @@ accumulator = Accumulator()
app = FastAPI ( )
app = FastAPI ( )
app_dir = user_data_dir ( ' 01 ' )
app_dir = user_data_dir ( " 01 " )
conversation_history_path = os . path . join ( app_dir , ' conversations ' , ' user.json ' )
conversation_history_path = os . path . join ( app_dir , " conversations " , " user.json " )
SERVER_LOCAL_PORT = int ( os . getenv ( ' SERVER_LOCAL_PORT ' , 10001 ) )
SERVER_LOCAL_PORT = int ( os . getenv ( " SERVER_LOCAL_PORT " , 10001 ) )
# This is so we only say() full sentences
# This is so we only say() full sentences
def is_full_sentence ( text ) :
def is_full_sentence ( text ) :
return text . endswith ( ( ' . ' , ' ! ' , ' ? ' ) )
return text . endswith ( ( " . " , " ! " , " ? " ) )
def split_into_sentences ( text ) :
def split_into_sentences ( text ) :
return re . split ( r ' (?<=[.!?]) \ s+ ' , text )
return re . split ( r " (?<=[.!?]) \ s+ " , text )
# Queues
# Queues
from_computer = queue . Queue ( ) # Just for computer messages from the device. Sync queue because interpreter.run is synchronous
from_computer = (
queue . Queue ( )
) # Just for computer messages from the device. Sync queue because interpreter.run is synchronous
from_user = asyncio . Queue ( ) # Just for user messages from the device.
from_user = asyncio . Queue ( ) # Just for user messages from the device.
to_device = asyncio . Queue ( ) # For messages we send.
to_device = asyncio . Queue ( ) # For messages we send.
# Switch code executor to device if that's set
# Switch code executor to device if that's set
if os . getenv ( ' CODE_RUNNER ' ) == " device " :
if os . getenv ( " CODE_RUNNER " ) == " device " :
# (This should probably just loop through all languages and apply these changes instead)
# (This should probably just loop through all languages and apply these changes instead)
class Python :
class Python :
@ -79,13 +80,32 @@ if os.getenv('CODE_RUNNER') == "device":
""" Generator that yields a dictionary in LMC Format. """
""" Generator that yields a dictionary in LMC Format. """
# Prepare the data
# Prepare the data
message = { " role " : " assistant " , " type " : " code " , " format " : " python " , " content " : code }
message = {
" role " : " assistant " ,
" type " : " code " ,
" format " : " python " ,
" content " : code ,
}
# Unless it was just sent to the device, send it wrapped in flags
# Unless it was just sent to the device, send it wrapped in flags
if not ( interpreter . messages and interpreter . messages [ - 1 ] == message ) :
if not ( interpreter . messages and interpreter . messages [ - 1 ] == message ) :
to_device . put ( { " role " : " assistant " , " type " : " code " , " format " : " python " , " start " : True } )
to_device . put (
{
" role " : " assistant " ,
" type " : " code " ,
" format " : " python " ,
" start " : True ,
}
)
to_device . put ( message )
to_device . put ( message )
to_device . put ( { " role " : " assistant " , " type " : " code " , " format " : " python " , " end " : True } )
to_device . put (
{
" role " : " assistant " ,
" type " : " code " ,
" format " : " python " ,
" end " : True ,
}
)
# Stream the response
# Stream the response
logger . info ( " Waiting for the device to respond... " )
logger . info ( " Waiting for the device to respond... " )
@ -109,10 +129,12 @@ if os.getenv('CODE_RUNNER') == "device":
# Configure interpreter
# Configure interpreter
interpreter = configure_interpreter ( interpreter )
interpreter = configure_interpreter ( interpreter )
@app.get ( " /ping " )
@app.get ( " /ping " )
async def ping ( ) :
async def ping ( ) :
return PlainTextResponse ( " pong " )
return PlainTextResponse ( " pong " )
@app.websocket ( " / " )
@app.websocket ( " / " )
async def websocket_endpoint ( websocket : WebSocket ) :
async def websocket_endpoint ( websocket : WebSocket ) :
await websocket . accept ( )
await websocket . accept ( )
@ -145,19 +167,21 @@ async def receive_messages(websocket: WebSocket):
except Exception as e :
except Exception as e :
print ( str ( e ) )
print ( str ( e ) )
return
return
if ' text ' in data :
if " text " in data :
try :
try :
data = json . loads ( data [ ' text ' ] )
data = json . loads ( data [ " text " ] )
if data [ " role " ] == " computer " :
if data [ " role " ] == " computer " :
from_computer . put ( data ) # To be handled by interpreter.computer.run
from_computer . put (
data
) # To be handled by interpreter.computer.run
elif data [ " role " ] == " user " :
elif data [ " role " ] == " user " :
await from_user . put ( data )
await from_user . put ( data )
else :
else :
raise ( " Unknown role: " , data )
raise ( " Unknown role: " , data )
except json . JSONDecodeError :
except json . JSONDecodeError :
pass # data is not JSON, leave it as is
pass # data is not JSON, leave it as is
elif ' bytes ' in data :
elif " bytes " in data :
data = data [ ' bytes ' ] # binary data
data = data [ " bytes " ] # binary data
await from_user . put ( data )
await from_user . put ( data )
except WebSocketDisconnect as e :
except WebSocketDisconnect as e :
if e . code == 1000 :
if e . code == 1000 :
@ -184,8 +208,8 @@ async def send_messages(websocket: WebSocket):
await to_device . put ( message )
await to_device . put ( message )
raise
raise
async def listener ( ) :
async def listener ( ) :
while True :
while True :
try :
try :
while True :
while True :
@ -197,8 +221,6 @@ async def listener():
break
break
await asyncio . sleep ( 1 )
await asyncio . sleep ( 1 )
message = accumulator . accumulate ( chunk )
message = accumulator . accumulate ( chunk )
if message == None :
if message == None :
# Will be None until we have a full message ready
# Will be None until we have a full message ready
@ -209,8 +231,11 @@ async def listener():
# At this point, we have our message
# At this point, we have our message
if message [ " type " ] == " audio " and message [ " format " ] . startswith ( " bytes " ) :
if message [ " type " ] == " audio " and message [ " format " ] . startswith ( " bytes " ) :
if (
if " content " not in message or message [ " content " ] == None or message [ " content " ] == " " : # If it was nothing / silence / empty
" content " not in message
or message [ " content " ] == None
or message [ " content " ] == " "
) : # If it was nothing / silence / empty
continue
continue
# Convert bytes to audio file
# Convert bytes to audio file
@ -222,6 +247,7 @@ async def listener():
if False :
if False :
os . system ( f " open { audio_file_path } " )
os . system ( f " open { audio_file_path } " )
import time
import time
time . sleep ( 15 )
time . sleep ( 15 )
text = stt ( audio_file_path )
text = stt ( audio_file_path )
@ -239,21 +265,21 @@ async def listener():
continue
continue
# Load, append, and save conversation history
# Load, append, and save conversation history
with open ( conversation_history_path , ' r ' ) as file :
with open ( conversation_history_path , " r " ) as file :
messages = json . load ( file )
messages = json . load ( file )
messages . append ( message )
messages . append ( message )
with open ( conversation_history_path , ' w ' ) as file :
with open ( conversation_history_path , " w " ) as file :
json . dump ( messages , file , indent = 4 )
json . dump ( messages , file , indent = 4 )
accumulated_text = " "
accumulated_text = " "
if any (
if any ( [ m [ " type " ] == " image " for m in messages ] ) and interpreter . llm . model . startswith ( " gpt- " ) :
[ m [ " type " ] == " image " for m in messages ]
) and interpreter . llm . model . startswith ( " gpt- " ) :
interpreter . llm . model = " gpt-4-vision-preview "
interpreter . llm . model = " gpt-4-vision-preview "
interpreter . llm . supports_vision = True
interpreter . llm . supports_vision = True
for chunk in interpreter . chat ( messages , stream = True , display = True ) :
for chunk in interpreter . chat ( messages , stream = True , display = True ) :
if any ( [ m [ " type " ] == " image " for m in interpreter . messages ] ) :
if any ( [ m [ " type " ] == " image " for m in interpreter . messages ] ) :
interpreter . llm . model = " gpt-4-vision-preview "
interpreter . llm . model = " gpt-4-vision-preview "
@ -264,16 +290,22 @@ async def listener():
# Yield to the event loop, so you actually send it out
# Yield to the event loop, so you actually send it out
await asyncio . sleep ( 0.01 )
await asyncio . sleep ( 0.01 )
if os . getenv ( ' TTS_RUNNER ' ) == " server " :
if os . getenv ( " TTS_RUNNER " ) == " server " :
# Speak full sentences out loud
# Speak full sentences out loud
if chunk [ " role " ] == " assistant " and " content " in chunk and chunk [ " type " ] == " message " :
if (
chunk [ " role " ] == " assistant "
and " content " in chunk
and chunk [ " type " ] == " message "
) :
accumulated_text + = chunk [ " content " ]
accumulated_text + = chunk [ " content " ]
sentences = split_into_sentences ( accumulated_text )
sentences = split_into_sentences ( accumulated_text )
# If we're going to speak, say we're going to stop sending text.
# If we're going to speak, say we're going to stop sending text.
# This should be fixed probably, we should be able to do both in parallel, or only one.
# This should be fixed probably, we should be able to do both in parallel, or only one.
if any ( is_full_sentence ( sentence ) for sentence in sentences ) :
if any ( is_full_sentence ( sentence ) for sentence in sentences ) :
await to_device . put ( { " role " : " assistant " , " type " : " message " , " end " : True } )
await to_device . put (
{ " role " : " assistant " , " type " : " message " , " end " : True }
)
if is_full_sentence ( sentences [ - 1 ] ) :
if is_full_sentence ( sentences [ - 1 ] ) :
for sentence in sentences :
for sentence in sentences :
@ -287,22 +319,27 @@ async def listener():
# If we're going to speak, say we're going to stop sending text.
# If we're going to speak, say we're going to stop sending text.
# This should be fixed probably, we should be able to do both in parallel, or only one.
# This should be fixed probably, we should be able to do both in parallel, or only one.
if any ( is_full_sentence ( sentence ) for sentence in sentences ) :
if any ( is_full_sentence ( sentence ) for sentence in sentences ) :
await to_device . put ( { " role " : " assistant " , " type " : " message " , " start " : True } )
await to_device . put (
{ " role " : " assistant " , " type " : " message " , " start " : True }
)
# If we have a new message, save our progress and go back to the top
# If we have a new message, save our progress and go back to the top
if not from_user . empty ( ) :
if not from_user . empty ( ) :
# Check if it's just an end flag. We ignore those.
# Check if it's just an end flag. We ignore those.
temp_message = await from_user . get ( )
temp_message = await from_user . get ( )
if type ( temp_message ) is dict and temp_message . get ( " role " ) == " user " and temp_message . get ( " end " ) :
if (
type ( temp_message ) is dict
and temp_message . get ( " role " ) == " user "
and temp_message . get ( " end " )
) :
# Yup. False alarm.
# Yup. False alarm.
continue
continue
else :
else :
# Whoops! Put that back
# Whoops! Put that back
await from_user . put ( temp_message )
await from_user . put ( temp_message )
with open ( conversation_history_path , ' w ' ) as file :
with open ( conversation_history_path , " w " ) as file :
json . dump ( interpreter . messages , file , indent = 4 )
json . dump ( interpreter . messages , file , indent = 4 )
# TODO: is triggering seemingly randomly
# TODO: is triggering seemingly randomly
@ -311,8 +348,7 @@ async def listener():
# Also check if there's any new computer messages
# Also check if there's any new computer messages
if not from_computer . empty ( ) :
if not from_computer . empty ( ) :
with open ( conversation_history_path , " w " ) as file :
with open ( conversation_history_path , ' w ' ) as file :
json . dump ( interpreter . messages , file , indent = 4 )
json . dump ( interpreter . messages , file , indent = 4 )
logger . info ( " New computer message recieved. Breaking. " )
logger . info ( " New computer message recieved. Breaking. " )
@ -320,6 +356,7 @@ async def listener():
except :
except :
traceback . print_exc ( )
traceback . print_exc ( )
async def stream_tts_to_device ( sentence ) :
async def stream_tts_to_device ( sentence ) :
force_task_completion_responses = [
force_task_completion_responses = [
" the task is done " ,
" the task is done " ,
@ -332,8 +369,8 @@ async def stream_tts_to_device(sentence):
for chunk in stream_tts ( sentence ) :
for chunk in stream_tts ( sentence ) :
await to_device . put ( chunk )
await to_device . put ( chunk )
def stream_tts ( sentence ) :
def stream_tts ( sentence ) :
audio_file = tts ( sentence )
audio_file = tts ( sentence )
with open ( audio_file , " rb " ) as f :
with open ( audio_file , " rb " ) as f :
@ -350,64 +387,84 @@ def stream_tts(sentence):
yield chunk
yield chunk
yield { " role " : " assistant " , " type " : " audio " , " format " : file_type , " end " : True }
yield { " role " : " assistant " , " type " : " audio " , " format " : file_type , " end " : True }
from uvicorn import Config , Server
from uvicorn import Config , Server
import os
import os
import platform
from importlib import import_module
from importlib import import_module
# these will be overwritten
# these will be overwritten
HOST = ' '
HOST = " "
PORT = 0
PORT = 0
@app.on_event ( " startup " )
@app.on_event ( " startup " )
async def startup_event ( ) :
async def startup_event ( ) :
server_url = f " { HOST } : { PORT } "
server_url = f " { HOST } : { PORT } "
print ( " " )
print ( " " )
print_markdown ( f " \n *Ready.* \n " )
print_markdown ( " \n *Ready.* \n " )
print ( " " )
print ( " " )
@app.on_event ( " shutdown " )
@app.on_event ( " shutdown " )
async def shutdown_event ( ) :
async def shutdown_event ( ) :
print_markdown ( " *Server is shutting down* " )
print_markdown ( " *Server is shutting down* " )
async def main ( server_host , server_port , llm_service , model , llm_supports_vision , llm_supports_functions , context_window , max_tokens , temperature , tts_service , stt_service ) :
async def main (
server_host ,
server_port ,
llm_service ,
model ,
llm_supports_vision ,
llm_supports_functions ,
context_window ,
max_tokens ,
temperature ,
tts_service ,
stt_service ,
) :
global HOST
global HOST
global PORT
global PORT
PORT = server_port
PORT = server_port
HOST = server_host
HOST = server_host
# Setup services
# Setup services
application_directory = user_data_dir ( ' 01 ' )
application_directory = user_data_dir ( " 01 " )
services_directory = os . path . join ( application_directory , ' services ' )
services_directory = os . path . join ( application_directory , " services " )
service_dict = { ' llm ' : llm_service , ' tts ' : tts_service , ' stt ' : stt_service }
service_dict = { " llm " : llm_service , " tts " : tts_service , " stt " : stt_service }
# Create a temp file with the session number
# Create a temp file with the session number
session_file_path = os . path . join ( user_data_dir ( ' 01 ' ) , ' 01-session.txt ' )
session_file_path = os . path . join ( user_data_dir ( " 01 " ) , " 01-session.txt " )
with open ( session_file_path , ' w ' ) as session_file :
with open ( session_file_path , " w " ) as session_file :
session_id = int ( datetime . datetime . now ( ) . timestamp ( ) * 1000 )
session_id = int ( datetime . datetime . now ( ) . timestamp ( ) * 1000 )
session_file . write ( str ( session_id ) )
session_file . write ( str ( session_id ) )
for service in service_dict :
for service in service_dict :
service_directory = os . path . join (
service_directory = os . path . join ( services_directory , service , service_dict [ service ] )
services_directory , service , service_dict [ service ]
)
# This is the folder they can mess around in
# This is the folder they can mess around in
config = { " service_directory " : service_directory }
config = { " service_directory " : service_directory }
if service == " llm " :
if service == " llm " :
config . update ( {
config . update (
{
" interpreter " : interpreter ,
" interpreter " : interpreter ,
" model " : model ,
" model " : model ,
" llm_supports_vision " : llm_supports_vision ,
" llm_supports_vision " : llm_supports_vision ,
" llm_supports_functions " : llm_supports_functions ,
" llm_supports_functions " : llm_supports_functions ,
" context_window " : context_window ,
" context_window " : context_window ,
" max_tokens " : max_tokens ,
" max_tokens " : max_tokens ,
" temperature " : temperature
" temperature " : temperature ,
} )
}
)
module = import_module ( f ' .server.services. { service } . { service_dict [ service ] } . { service } ' , package = ' source ' )
module = import_module (
f " .server.services. { service } . { service_dict [ service ] } . { service } " ,
package = " source " ,
)
ServiceClass = getattr ( module , service . capitalize ( ) )
ServiceClass = getattr ( module , service . capitalize ( ) )
service_instance = ServiceClass ( config )
service_instance = ServiceClass ( config )
@ -422,10 +479,11 @@ async def main(server_host, server_port, llm_service, model, llm_supports_vision
if True : # in the future, code can run on device. for now, just server.
if True : # in the future, code can run on device. for now, just server.
asyncio . create_task ( put_kernel_messages_into_queue ( from_computer ) )
asyncio . create_task ( put_kernel_messages_into_queue ( from_computer ) )
config = Config ( app , host = server_host , port = int ( server_port ) , lifespan = ' on ' )
config = Config ( app , host = server_host , port = int ( server_port ) , lifespan = " on " )
server = Server ( config )
server = Server ( config )
await server . serve ( )
await server . serve ( )
# Run the FastAPI app
# Run the FastAPI app
if __name__ == " __main__ " :
if __name__ == " __main__ " :
asyncio . run ( main ( ) )
asyncio . run ( main ( ) )