diff --git a/01OS/01OS/clients/base_device.py b/01OS/01OS/clients/base_device.py index 78e31e5..e198cba 100644 --- a/01OS/01OS/clients/base_device.py +++ b/01OS/01OS/clients/base_device.py @@ -1,6 +1,8 @@ from dotenv import load_dotenv load_dotenv() # take environment variables from .env. +import os +import warnings import asyncio import threading import os @@ -21,7 +23,6 @@ import time import wave import tempfile from datetime import datetime -from mss import mss import cv2 from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run? from ..server.utils.kernel import put_kernel_messages_into_queue @@ -57,13 +58,15 @@ send_queue = queue.Queue() class Device: def __init__(self): - pass + self.pressed_keys = set() + self.captured_images = [] def fetch_image_from_camera(self, camera_index=CAMERA_DEVICE_INDEX): - """Captures an image from the specified camera device and saves it to a temporary file. Returns the path to the saved image file.""" + """Captures an image from the specified camera device and saves it to a temporary file. Adds the image to the captured_images list.""" image_path = None + cap = cv2.VideoCapture(camera_index) - ret, frame = cap.read() # Capture a single frame to initialize the camera + ret, frame = cap.read() # Capture a single frame to initialize the camera if CAMERA_WARMUP_SECONDS > 0: # Allow camera to warm up, then snap a picture again @@ -71,17 +74,17 @@ class Device: # picture immediately when they are first turned on time.sleep(CAMERA_WARMUP_SECONDS) ret, frame = cap.read() - + if ret: temp_dir = tempfile.gettempdir() image_path = os.path.join(temp_dir, f"01_photo_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.jpg") + self.captured_images.append(image_path) cv2.imwrite(image_path, frame) logger.info(f"Camera image captured to {image_path}") - # if os.name == 'posix': - # os.system(f"open {image_path}") + logger.info(f"You now have {len(self.captured_images)} images which will be sent along with your next audio message.") else: - logger.error("Error: Couldn't capture an image from camera ({camera_index})") - + logger.error(f"Error: Couldn't capture an image from camera ({camera_index})") + cap.release() return image_path @@ -160,21 +163,28 @@ class Device: RECORDING = False def on_press(self, key): - """Detect spacebar press and ESC key press.""" - if key == keyboard.Key.esc or (key == keyboard.Key.ctrl and key.char == 'c'): + """Detect spacebar press, ESC key press, and Ctrl+C combination.""" + self.pressed_keys.add(key) # Add the pressed key to the set + + if keyboard.Key.esc in self.pressed_keys: logger.info("Exiting...") os._exit(0) - elif key == keyboard.Key.space: + elif keyboard.Key.space in self.pressed_keys: self.toggle_recording(True) + elif {keyboard.Key.ctrl, keyboard.KeyCode.from_char('c')} <= self.pressed_keys: + logger.info("Ctrl+C pressed. Exiting...") + os._exit(0) def on_release(self, key): - """Detect spacebar release and 'c' key press for camera.""" + """Detect spacebar release and 'c' key press for camera, and handle key release.""" + self.pressed_keys.discard(key) # Remove the released key from the key press tracking set + if key == keyboard.Key.space: self.toggle_recording(False) - elif key.char == 'c' and CAMERA_ENABLED: + elif CAMERA_ENABLED and hasattr(key, 'char') and key == keyboard.KeyCode.from_char('c'): self.fetch_image_from_camera() - + async def message_sender(self, websocket): while True: message = await asyncio.get_event_loop().run_in_executor(None, send_queue.get)