|
|
|
@ -1,6 +1,8 @@
|
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
load_dotenv() # take environment variables from .env.
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
import warnings
|
|
|
|
|
import asyncio
|
|
|
|
|
import threading
|
|
|
|
|
import os
|
|
|
|
@ -21,7 +23,6 @@ import time
|
|
|
|
|
import wave
|
|
|
|
|
import tempfile
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from mss import mss
|
|
|
|
|
import cv2
|
|
|
|
|
from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run?
|
|
|
|
|
from ..server.utils.kernel import put_kernel_messages_into_queue
|
|
|
|
@ -57,13 +58,15 @@ send_queue = queue.Queue()
|
|
|
|
|
|
|
|
|
|
class Device:
|
|
|
|
|
def __init__(self):
|
|
|
|
|
pass
|
|
|
|
|
self.pressed_keys = set()
|
|
|
|
|
self.captured_images = []
|
|
|
|
|
|
|
|
|
|
def fetch_image_from_camera(self, camera_index=CAMERA_DEVICE_INDEX):
|
|
|
|
|
"""Captures an image from the specified camera device and saves it to a temporary file. Returns the path to the saved image file."""
|
|
|
|
|
"""Captures an image from the specified camera device and saves it to a temporary file. Adds the image to the captured_images list."""
|
|
|
|
|
image_path = None
|
|
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(camera_index)
|
|
|
|
|
ret, frame = cap.read() # Capture a single frame to initialize the camera
|
|
|
|
|
ret, frame = cap.read() # Capture a single frame to initialize the camera
|
|
|
|
|
|
|
|
|
|
if CAMERA_WARMUP_SECONDS > 0:
|
|
|
|
|
# Allow camera to warm up, then snap a picture again
|
|
|
|
@ -71,17 +74,17 @@ class Device:
|
|
|
|
|
# picture immediately when they are first turned on
|
|
|
|
|
time.sleep(CAMERA_WARMUP_SECONDS)
|
|
|
|
|
ret, frame = cap.read()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if ret:
|
|
|
|
|
temp_dir = tempfile.gettempdir()
|
|
|
|
|
image_path = os.path.join(temp_dir, f"01_photo_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.jpg")
|
|
|
|
|
self.captured_images.append(image_path)
|
|
|
|
|
cv2.imwrite(image_path, frame)
|
|
|
|
|
logger.info(f"Camera image captured to {image_path}")
|
|
|
|
|
# if os.name == 'posix':
|
|
|
|
|
# os.system(f"open {image_path}")
|
|
|
|
|
logger.info(f"You now have {len(self.captured_images)} images which will be sent along with your next audio message.")
|
|
|
|
|
else:
|
|
|
|
|
logger.error("Error: Couldn't capture an image from camera ({camera_index})")
|
|
|
|
|
|
|
|
|
|
logger.error(f"Error: Couldn't capture an image from camera ({camera_index})")
|
|
|
|
|
|
|
|
|
|
cap.release()
|
|
|
|
|
|
|
|
|
|
return image_path
|
|
|
|
@ -160,21 +163,28 @@ class Device:
|
|
|
|
|
RECORDING = False
|
|
|
|
|
|
|
|
|
|
def on_press(self, key):
|
|
|
|
|
"""Detect spacebar press and ESC key press."""
|
|
|
|
|
if key == keyboard.Key.esc or (key == keyboard.Key.ctrl and key.char == 'c'):
|
|
|
|
|
"""Detect spacebar press, ESC key press, and Ctrl+C combination."""
|
|
|
|
|
self.pressed_keys.add(key) # Add the pressed key to the set
|
|
|
|
|
|
|
|
|
|
if keyboard.Key.esc in self.pressed_keys:
|
|
|
|
|
logger.info("Exiting...")
|
|
|
|
|
os._exit(0)
|
|
|
|
|
elif key == keyboard.Key.space:
|
|
|
|
|
elif keyboard.Key.space in self.pressed_keys:
|
|
|
|
|
self.toggle_recording(True)
|
|
|
|
|
elif {keyboard.Key.ctrl, keyboard.KeyCode.from_char('c')} <= self.pressed_keys:
|
|
|
|
|
logger.info("Ctrl+C pressed. Exiting...")
|
|
|
|
|
os._exit(0)
|
|
|
|
|
|
|
|
|
|
def on_release(self, key):
|
|
|
|
|
"""Detect spacebar release and 'c' key press for camera."""
|
|
|
|
|
"""Detect spacebar release and 'c' key press for camera, and handle key release."""
|
|
|
|
|
self.pressed_keys.discard(key) # Remove the released key from the key press tracking set
|
|
|
|
|
|
|
|
|
|
if key == keyboard.Key.space:
|
|
|
|
|
self.toggle_recording(False)
|
|
|
|
|
elif key.char == 'c' and CAMERA_ENABLED:
|
|
|
|
|
elif CAMERA_ENABLED and hasattr(key, 'char') and key == keyboard.KeyCode.from_char('c'):
|
|
|
|
|
self.fetch_image_from_camera()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def message_sender(self, websocket):
|
|
|
|
|
while True:
|
|
|
|
|
message = await asyncio.get_event_loop().run_in_executor(None, send_queue.get)
|
|
|
|
|