@ -1,6 +1,7 @@
from dotenv import load_dotenv
load_dotenv ( ) # take environment variables from .env.
import os
import asyncio
import threading
import os
@ -21,7 +22,10 @@ import time
import wave
import tempfile
from datetime import datetime
import cv2
import base64
from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run?
# In the future, I guess kernel watching code should be elsewhere? Somewhere server / client agnostic?
from . . server . utils . kernel import put_kernel_messages_into_queue
from . . server . utils . get_system_info import get_system_info
from . . server . stt . stt import stt_wav
@ -30,6 +34,11 @@ from ..server.utils.logs import setup_logging
from . . server . utils . logs import logger
setup_logging ( )
from . . utils . accumulator import Accumulator
accumulator = Accumulator ( )
# Configuration for Audio Recording
CHUNK = 1024 # Record in chunks of 1024 samples
FORMAT = pyaudio . paInt16 # 16 bits per sample
@ -38,25 +47,99 @@ RATE = 44100 # Sample rate
RECORDING = False # Flag to control recording state
SPACEBAR_PRESSED = False # Flag to track spacebar press state
# Camera configuration
CAMERA_ENABLED = bool ( os . getenv ( ' CAMERA_ENABLED ' , False ) )
CAMERA_DEVICE_INDEX = int ( os . getenv ( ' CAMERA_DEVICE_INDEX ' , 0 ) )
CAMERA_WARMUP_SECONDS = float ( os . getenv ( ' CAMERA_WARMUP_SECONDS ' , 0 ) )
# Specify OS
current_platform = get_system_info ( )
# Initialize PyAudio
p = pyaudio . PyAudio ( )
import asyncio
send_queue = queue . Queue ( )
class Device :
def __init__ ( self ) :
self . pressed_keys = set ( )
self . captured_images = [ ]
self . audiosegments = [ ]
def fetch_image_from_camera ( self , camera_index = CAMERA_DEVICE_INDEX ) :
""" Captures an image from the specified camera device and saves it to a temporary file. Adds the image to the captured_images list. """
image_path = None
cap = cv2 . VideoCapture ( camera_index )
ret , frame = cap . read ( ) # Capture a single frame to initialize the camera
if CAMERA_WARMUP_SECONDS > 0 :
# Allow camera to warm up, then snap a picture again
# This is a workaround for some cameras that don't return a properly exposed
# picture immediately when they are first turned on
time . sleep ( CAMERA_WARMUP_SECONDS )
ret , frame = cap . read ( )
if ret :
temp_dir = tempfile . gettempdir ( )
image_path = os . path . join ( temp_dir , f " 01_photo_ { datetime . now ( ) . strftime ( ' % Y % m %d % H % M % S %f ' ) } .png " )
self . captured_images . append ( image_path )
cv2 . imwrite ( image_path , frame )
logger . info ( f " Camera image captured to { image_path } " )
logger . info ( f " You now have { len ( self . captured_images ) } images which will be sent along with your next audio message. " )
else :
logger . error ( f " Error: Couldn ' t capture an image from camera ( { camera_index } ) " )
cap . release ( )
return image_path
def encode_image_to_base64 ( self , image_path ) :
""" Encodes an image file to a base64 string. """
with open ( image_path , " rb " ) as image_file :
return base64 . b64encode ( image_file . read ( ) ) . decode ( ' utf-8 ' )
def add_image_to_send_queue ( self , image_path ) :
""" Encodes an image and adds an LMC message to the send queue with the image data. """
base64_image = self . encode_image_to_base64 ( image_path )
image_message = {
" role " : " user " ,
" type " : " image " ,
" format " : " base64.png " ,
" content " : base64_image
}
send_queue . put ( image_message )
# Delete the image file from the file system after sending it
os . remove ( image_path )
def queue_all_captured_images ( self ) :
""" Queues all captured images to be sent. """
for image_path in self . captured_images :
self . add_image_to_send_queue ( image_path )
self . captured_images . clear ( ) # Clear the list after sending
async def play_audiosegments ( self ) :
""" Plays them sequentially. """
while True :
try :
for audio in self . audiosegments :
play ( audio )
self . audiosegments . remove ( audio )
await asyncio . sleep ( 0.1 )
except asyncio . exceptions . CancelledError :
# This happens once at the start?
pass
except :
logger . info ( traceback . format_exc ( ) )
def record_audio ( self ) :
if os . getenv ( ' STT_RUNNER ' ) == " server " :
# STT will happen on the server. we're sending audio.
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " audio/wav " , " start " : True } )
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " bytes. wav" , " start " : True } )
elif os . getenv ( ' STT_RUNNER ' ) == " client " :
# STT will happen here, on the client. we're sending text.
send_queue . put ( { " role " : " user " , " type " : " message " , " start " : True } )
@ -92,9 +175,11 @@ class Device:
send_queue . put ( { " role " : " user " , " type " : " message " , " content " : " stop " } )
send_queue . put ( { " role " : " user " , " type " : " message " , " end " : True } )
else :
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " audio/ wav" , " content " : " " } )
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " audio/ wav" , " end " : True } )
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " bytes. wav" , " content " : " " } )
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " bytes. wav" , " end " : True } )
else :
self . queue_all_captured_images ( )
if os . getenv ( ' STT_RUNNER ' ) == " client " :
# Run stt then send text
text = stt_wav ( wav_path )
@ -105,9 +190,9 @@ class Device:
with open ( wav_path , ' rb ' ) as audio_file :
byte_data = audio_file . read ( CHUNK )
while byte_data :
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " audio/wav " , " content " : str ( byte_data ) } )
send_queue . put ( byte_data )
byte_data = audio_file . read ( CHUNK )
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " audio/ wav" , " end " : True } )
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " bytes. wav" , " end " : True } )
if os . path . exists ( wav_path ) :
os . remove ( wav_path )
@ -125,86 +210,82 @@ class Device:
RECORDING = False
def on_press ( self , key ) :
""" Detect spacebar press , ESC key press, and Ctrl+C combination."""
""" Detect spacebar press and Ctrl+C combination."""
self . pressed_keys . add ( key ) # Add the pressed key to the set
if keyboard . Key . esc in self . pressed_keys :
logger . info ( " Exiting... " )
os . _exit ( 0 )
elif keyboard . Key . space in self . pressed_keys :
if keyboard . Key . space in self . pressed_keys :
self . toggle_recording ( True )
elif { keyboard . Key . ctrl , keyboard . KeyCode . from_char ( ' c ' ) } < = self . pressed_keys :
logger . info ( " Ctrl+C pressed. Exiting... " )
os . _exit ( 0 )
def on_release ( self , key ) :
""" Detect spacebar release and ESC key press ."""
""" Detect spacebar release and ' c ' key press for camera, and handle key release ."""
self . pressed_keys . discard ( key ) # Remove the released key from the key press tracking set
if key == keyboard . Key . space :
self . toggle_recording ( False )
elif key == keyboard . Key . esc or ( key == keyboard . Key . ctrl and keyboard . Key . c ) :
logger . info ( " Exiting... " )
os . _exit ( 0 )
elif CAMERA_ENABLED and key == keyboard . KeyCode . from_char ( ' c ' ) :
self . fetch_image_from_camera ( )
async def message_sender ( self , websocket ) :
while True :
message = await asyncio . get_event_loop ( ) . run_in_executor ( None , send_queue . get )
if isinstance ( message , bytes ) :
await websocket . send ( message )
else :
await websocket . send ( json . dumps ( message ) )
send_queue . task_done ( )
await asyncio . sleep ( 0.01 )
async def websocket_communication ( self , WS_URL ) :
while True :
try :
async with websockets . connect ( WS_URL ) as websocket :
logger . info ( " Press the spacebar to start/stop recording. Press ESC to exit. " )
asyncio . create_task ( self . message_sender ( websocket ) )
if CAMERA_ENABLED :
logger . info ( " Press the spacebar to start/stop recording. Press ' c ' to capture an image from the camera. Press CTRL-C to exit. " )
else :
logger . info ( " Press the spacebar to start/stop recording. Press CTRL-C to exit. " )
initial_message = { " role " : None , " type " : None , " format " : None , " content " : None }
message_so_far = initial_message
asyncio . create_task ( self . message_sender ( websocket ) )
while True :
message = await websocket . recv ( )
await asyncio . sleep ( 0.01 )
chunk = await websocket . recv ( )
logger . debug ( f " Got this message from the server: { type ( message) } { message } " )
logger . debug ( f " Got this message from the server: { type ( chunk) } { chunk } " )
if type ( message ) == str :
message = json . loads ( message )
if type ( chunk ) == str :
chunk = json . loads ( chunk )
if message . get ( " end " ) :
logger . debug ( f " Complete message from the server: { message_so_far } " )
logger . info ( " \n " )
message_so_far = initial_messag e
message = accumulator . accumulate ( chunk )
if message == None :
# Will be None until we have a full message ready
continu e
if " content " in message :
print ( message [ ' content ' ] , end = " " , flush = True )
if any ( message_so_far [ key ] != message [ key ] for key in message_so_far if key != " content " ) :
message_so_far = message
else :
message_so_far [ " content " ] + = message [ " content " ]
# At this point, we have our message
if message [ " type " ] == " audio " and " content " in message :
audio_bytes = bytes ( ast . literal_eval ( message [ " content " ] ) )
if message [ " type " ] == " audio " and message [ " format " ] . startswith ( " bytes " ) :
# Convert bytes to audio file
audio_file = io . BytesIO ( audio_bytes )
audio = AudioSegment . from_mp3 ( audio_file )
# Play the audio
play ( audio )
# Format will be bytes.wav or bytes.opus
audio_bytes = io . BytesIO ( message [ " content " ] )
audio = AudioSegment . from_file ( audio_bytes , codec = message [ " format " ] . split ( " . " ) [ 1 ] )
await asyncio . sleep ( 1 )
self . audiosegments . append ( audio )
# Run the code if that's the client's job
if os . getenv ( ' CODE_RUNNER ' ) == " client " :
if message [ " type " ] == " code " and " end " in message :
language = message _so_far [ " format " ]
code = message _so_far [ " content " ]
language = message [ " format " ]
code = message [ " content " ]
result = interpreter . computer . run ( language , code )
send_queue . put ( result )
except :
# traceback.print_exc( )
logger . debug ( traceback . format_exc ( ) )
logger . info ( f " Connecting to ` { WS_URL } `... " )
await asyncio . sleep ( 2 )
@ -221,6 +302,7 @@ class Device:
if os . getenv ( ' CODE_RUNNER ' ) == " client " :
asyncio . create_task ( put_kernel_messages_into_queue ( send_queue ) )
asyncio . create_task ( self . play_audiosegments ( ) )
# If Raspberry Pi, add the button listener, otherwise use the spacebar
if current_platform . startswith ( " raspberry-pi " ) :