@ -1,6 +1,7 @@
from dotenv import load_dotenv
from dotenv import load_dotenv
load_dotenv ( ) # take environment variables from .env.
load_dotenv ( ) # take environment variables from .env.
import os
import asyncio
import asyncio
import threading
import threading
import os
import os
@ -21,6 +22,8 @@ import time
import wave
import wave
import tempfile
import tempfile
from datetime import datetime
from datetime import datetime
import cv2
import base64
from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run?
from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run?
# In the future, I guess kernel watching code should be elsewhere? Somewhere server / client agnostic?
# In the future, I guess kernel watching code should be elsewhere? Somewhere server / client agnostic?
from . . server . utils . kernel import put_kernel_messages_into_queue
from . . server . utils . kernel import put_kernel_messages_into_queue
@ -44,6 +47,11 @@ RATE = 44100 # Sample rate
RECORDING = False # Flag to control recording state
RECORDING = False # Flag to control recording state
SPACEBAR_PRESSED = False # Flag to track spacebar press state
SPACEBAR_PRESSED = False # Flag to track spacebar press state
# Camera configuration
CAMERA_ENABLED = bool ( os . getenv ( ' CAMERA_ENABLED ' , False ) )
CAMERA_DEVICE_INDEX = int ( os . getenv ( ' CAMERA_DEVICE_INDEX ' , 0 ) )
CAMERA_WARMUP_SECONDS = float ( os . getenv ( ' CAMERA_WARMUP_SECONDS ' , 0 ) )
# Specify OS
# Specify OS
current_platform = get_system_info ( )
current_platform = get_system_info ( )
@ -54,9 +62,64 @@ send_queue = queue.Queue()
class Device :
class Device :
def __init__ ( self ) :
def __init__ ( self ) :
self . pressed_keys = set ( )
self . captured_images = [ ]
self . audiosegments = [ ]
self . audiosegments = [ ]
pass
def fetch_image_from_camera ( self , camera_index = CAMERA_DEVICE_INDEX ) :
""" Captures an image from the specified camera device and saves it to a temporary file. Adds the image to the captured_images list. """
image_path = None
cap = cv2 . VideoCapture ( camera_index )
ret , frame = cap . read ( ) # Capture a single frame to initialize the camera
if CAMERA_WARMUP_SECONDS > 0 :
# Allow camera to warm up, then snap a picture again
# This is a workaround for some cameras that don't return a properly exposed
# picture immediately when they are first turned on
time . sleep ( CAMERA_WARMUP_SECONDS )
ret , frame = cap . read ( )
if ret :
temp_dir = tempfile . gettempdir ( )
image_path = os . path . join ( temp_dir , f " 01_photo_ { datetime . now ( ) . strftime ( ' % Y % m %d % H % M % S %f ' ) } .png " )
self . captured_images . append ( image_path )
cv2 . imwrite ( image_path , frame )
logger . info ( f " Camera image captured to { image_path } " )
logger . info ( f " You now have { len ( self . captured_images ) } images which will be sent along with your next audio message. " )
else :
logger . error ( f " Error: Couldn ' t capture an image from camera ( { camera_index } ) " )
cap . release ( )
return image_path
def encode_image_to_base64 ( self , image_path ) :
""" Encodes an image file to a base64 string. """
with open ( image_path , " rb " ) as image_file :
return base64 . b64encode ( image_file . read ( ) ) . decode ( ' utf-8 ' )
def add_image_to_send_queue ( self , image_path ) :
""" Encodes an image and adds an LMC message to the send queue with the image data. """
base64_image = self . encode_image_to_base64 ( image_path )
image_message = {
" role " : " user " ,
" type " : " image " ,
" format " : " base64.png " ,
" content " : base64_image
}
send_queue . put ( image_message )
# Delete the image file from the file system after sending it
os . remove ( image_path )
def queue_all_captured_images ( self ) :
""" Queues all captured images to be sent. """
for image_path in self . captured_images :
self . add_image_to_send_queue ( image_path )
self . captured_images . clear ( ) # Clear the list after sending
async def play_audiosegments ( self ) :
async def play_audiosegments ( self ) :
""" Plays them sequentially. """
""" Plays them sequentially. """
while True :
while True :
@ -112,6 +175,8 @@ class Device:
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " bytes.wav " , " content " : " " } )
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " bytes.wav " , " content " : " " } )
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " bytes.wav " , " end " : True } )
send_queue . put ( { " role " : " user " , " type " : " audio " , " format " : " bytes.wav " , " end " : True } )
else :
else :
self . queue_all_captured_images ( )
if os . getenv ( ' STT_RUNNER ' ) == " client " :
if os . getenv ( ' STT_RUNNER ' ) == " client " :
# Run stt then send text
# Run stt then send text
text = stt_wav ( wav_path )
text = stt_wav ( wav_path )
@ -142,18 +207,28 @@ class Device:
RECORDING = False
RECORDING = False
def on_press ( self , key ) :
def on_press ( self , key ) :
""" Detect spacebar press. """
""" Detect spacebar press, ESC key press, and Ctrl+C combination. """
if key == keyboard . Key . space :
self . pressed_keys . add ( key ) # Add the pressed key to the set
if keyboard . Key . esc in self . pressed_keys :
logger . info ( " Exiting... " )
os . _exit ( 0 )
elif keyboard . Key . space in self . pressed_keys :
self . toggle_recording ( True )
self . toggle_recording ( True )
elif { keyboard . Key . ctrl , keyboard . KeyCode . from_char ( ' c ' ) } < = self . pressed_keys :
logger . info ( " Ctrl+C pressed. Exiting... " )
os . _exit ( 0 )
def on_release ( self , key ) :
def on_release ( self , key ) :
""" Detect spacebar release and ESC key press. """
""" Detect spacebar release and ' c ' key press for camera, and handle key release. """
self . pressed_keys . discard ( key ) # Remove the released key from the key press tracking set
if key == keyboard . Key . space :
if key == keyboard . Key . space :
self . toggle_recording ( False )
self . toggle_recording ( False )
elif key == keyboard . Key . esc or ( key == keyboard . Key . ctrl and keyboard . Key . c ) :
elif CAMERA_ENABLED and key == keyboard . KeyCode . from_char ( ' c ' ) :
logger . info ( " Exiting... " )
self . fetch_image_from_camera ( )
os . _exit ( 0 )
async def message_sender ( self , websocket ) :
async def message_sender ( self , websocket ) :
while True :
while True :
message = await asyncio . get_event_loop ( ) . run_in_executor ( None , send_queue . get )
message = await asyncio . get_event_loop ( ) . run_in_executor ( None , send_queue . get )
@ -168,7 +243,11 @@ class Device:
while True :
while True :
try :
try :
async with websockets . connect ( WS_URL ) as websocket :
async with websockets . connect ( WS_URL ) as websocket :
logger . info ( " Press the spacebar to start/stop recording. Press ESC to exit. " )
if CAMERA_ENABLED :
logger . info ( " Press the spacebar to start/stop recording. Press ' c ' to capture an image from the camera. Press ESC to exit. " )
else :
logger . info ( " Press the spacebar to start/stop recording. Press ESC to exit. " )
asyncio . create_task ( self . message_sender ( websocket ) )
asyncio . create_task ( self . message_sender ( websocket ) )
while True :
while True :