diff --git a/01OS/01OS/clients/base_device.py b/01OS/01OS/clients/base_device.py index f2197a4..78e31e5 100644 --- a/01OS/01OS/clients/base_device.py +++ b/01OS/01OS/clients/base_device.py @@ -21,6 +21,8 @@ import time import wave import tempfile from datetime import datetime +from mss import mss +import cv2 from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run? from ..server.utils.kernel import put_kernel_messages_into_queue from ..server.utils.get_system_info import get_system_info @@ -38,6 +40,11 @@ RATE = 44100 # Sample rate RECORDING = False # Flag to control recording state SPACEBAR_PRESSED = False # Flag to track spacebar press state +# Camera configuration +CAMERA_ENABLED = bool(os.getenv('CAMERA_ENABLED', False)) +CAMERA_DEVICE_INDEX = int(os.getenv('CAMERA_DEVICE_INDEX', 0)) +CAMERA_WARMUP_SECONDS = float(os.getenv('CAMERA_WARMUP_SECONDS', 0)) + # Specify OS current_platform = get_system_info() @@ -52,6 +59,34 @@ class Device: def __init__(self): pass + def fetch_image_from_camera(self, camera_index=CAMERA_DEVICE_INDEX): + """Captures an image from the specified camera device and saves it to a temporary file. Returns the path to the saved image file.""" + image_path = None + cap = cv2.VideoCapture(camera_index) + ret, frame = cap.read() # Capture a single frame to initialize the camera + + if CAMERA_WARMUP_SECONDS > 0: + # Allow camera to warm up, then snap a picture again + # This is a workaround for some cameras that don't return a properly exposed + # picture immediately when they are first turned on + time.sleep(CAMERA_WARMUP_SECONDS) + ret, frame = cap.read() + + if ret: + temp_dir = tempfile.gettempdir() + image_path = os.path.join(temp_dir, f"01_photo_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.jpg") + cv2.imwrite(image_path, frame) + logger.info(f"Camera image captured to {image_path}") + # if os.name == 'posix': + # os.system(f"open {image_path}") + else: + logger.error("Error: Couldn't capture an image from camera ({camera_index})") + + cap.release() + + return image_path + + def record_audio(self): if os.getenv('STT_RUNNER') == "server": @@ -125,17 +160,20 @@ class Device: RECORDING = False def on_press(self, key): - """Detect spacebar press.""" - if key == keyboard.Key.space: + """Detect spacebar press and ESC key press.""" + if key == keyboard.Key.esc or (key == keyboard.Key.ctrl and key.char == 'c'): + logger.info("Exiting...") + os._exit(0) + elif key == keyboard.Key.space: self.toggle_recording(True) def on_release(self, key): - """Detect spacebar release and ESC key press.""" + """Detect spacebar release and 'c' key press for camera.""" if key == keyboard.Key.space: self.toggle_recording(False) - elif key == keyboard.Key.esc or (key == keyboard.Key.ctrl and keyboard.Key.c): - logger.info("Exiting...") - os._exit(0) + elif key.char == 'c' and CAMERA_ENABLED: + self.fetch_image_from_camera() + async def message_sender(self, websocket): while True: @@ -147,7 +185,11 @@ class Device: while True: try: async with websockets.connect(WS_URL) as websocket: - logger.info("Press the spacebar to start/stop recording. Press ESC to exit.") + if CAMERA_ENABLED: + logger.info("Press the spacebar to start/stop recording. Press 'c' to capture an image from the camera. Press ESC to exit.") + else: + logger.info("Press the spacebar to start/stop recording. Press ESC to exit.") + asyncio.create_task(self.message_sender(websocket)) initial_message = {"role": None, "type": None, "format": None, "content": None}