adding camera capture on "c" press

pull/34/head
Tom Chapin 11 months ago
parent 0b30790a4c
commit f18bfb73c1

@ -21,6 +21,8 @@ import time
import wave import wave
import tempfile import tempfile
from datetime import datetime from datetime import datetime
from mss import mss
import cv2
from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run? from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run?
from ..server.utils.kernel import put_kernel_messages_into_queue from ..server.utils.kernel import put_kernel_messages_into_queue
from ..server.utils.get_system_info import get_system_info from ..server.utils.get_system_info import get_system_info
@ -38,6 +40,11 @@ RATE = 44100 # Sample rate
RECORDING = False # Flag to control recording state RECORDING = False # Flag to control recording state
SPACEBAR_PRESSED = False # Flag to track spacebar press state SPACEBAR_PRESSED = False # Flag to track spacebar press state
# Camera configuration
CAMERA_ENABLED = bool(os.getenv('CAMERA_ENABLED', False))
CAMERA_DEVICE_INDEX = int(os.getenv('CAMERA_DEVICE_INDEX', 0))
CAMERA_WARMUP_SECONDS = float(os.getenv('CAMERA_WARMUP_SECONDS', 0))
# Specify OS # Specify OS
current_platform = get_system_info() current_platform = get_system_info()
@ -52,6 +59,34 @@ class Device:
def __init__(self): def __init__(self):
pass pass
def fetch_image_from_camera(self, camera_index=CAMERA_DEVICE_INDEX):
"""Captures an image from the specified camera device and saves it to a temporary file. Returns the path to the saved image file."""
image_path = None
cap = cv2.VideoCapture(camera_index)
ret, frame = cap.read() # Capture a single frame to initialize the camera
if CAMERA_WARMUP_SECONDS > 0:
# Allow camera to warm up, then snap a picture again
# This is a workaround for some cameras that don't return a properly exposed
# picture immediately when they are first turned on
time.sleep(CAMERA_WARMUP_SECONDS)
ret, frame = cap.read()
if ret:
temp_dir = tempfile.gettempdir()
image_path = os.path.join(temp_dir, f"01_photo_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.jpg")
cv2.imwrite(image_path, frame)
logger.info(f"Camera image captured to {image_path}")
# if os.name == 'posix':
# os.system(f"open {image_path}")
else:
logger.error("Error: Couldn't capture an image from camera ({camera_index})")
cap.release()
return image_path
def record_audio(self): def record_audio(self):
if os.getenv('STT_RUNNER') == "server": if os.getenv('STT_RUNNER') == "server":
@ -125,17 +160,20 @@ class Device:
RECORDING = False RECORDING = False
def on_press(self, key): def on_press(self, key):
"""Detect spacebar press.""" """Detect spacebar press and ESC key press."""
if key == keyboard.Key.space: if key == keyboard.Key.esc or (key == keyboard.Key.ctrl and key.char == 'c'):
logger.info("Exiting...")
os._exit(0)
elif key == keyboard.Key.space:
self.toggle_recording(True) self.toggle_recording(True)
def on_release(self, key): def on_release(self, key):
"""Detect spacebar release and ESC key press.""" """Detect spacebar release and 'c' key press for camera."""
if key == keyboard.Key.space: if key == keyboard.Key.space:
self.toggle_recording(False) self.toggle_recording(False)
elif key == keyboard.Key.esc or (key == keyboard.Key.ctrl and keyboard.Key.c): elif key.char == 'c' and CAMERA_ENABLED:
logger.info("Exiting...") self.fetch_image_from_camera()
os._exit(0)
async def message_sender(self, websocket): async def message_sender(self, websocket):
while True: while True:
@ -147,7 +185,11 @@ class Device:
while True: while True:
try: try:
async with websockets.connect(WS_URL) as websocket: async with websockets.connect(WS_URL) as websocket:
if CAMERA_ENABLED:
logger.info("Press the spacebar to start/stop recording. Press 'c' to capture an image from the camera. Press ESC to exit.")
else:
logger.info("Press the spacebar to start/stop recording. Press ESC to exit.") logger.info("Press the spacebar to start/stop recording. Press ESC to exit.")
asyncio.create_task(self.message_sender(websocket)) asyncio.create_task(self.message_sender(websocket))
initial_message = {"role": None, "type": None, "format": None, "content": None} initial_message = {"role": None, "type": None, "format": None, "content": None}

Loading…
Cancel
Save