|
|
|
@ -3,6 +3,7 @@ from dotenv import load_dotenv
|
|
|
|
|
load_dotenv() # take environment variables from .env.
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
import sys
|
|
|
|
|
import asyncio
|
|
|
|
|
import threading
|
|
|
|
|
import pyaudio
|
|
|
|
@ -58,7 +59,16 @@ CAMERA_WARMUP_SECONDS = float(os.getenv("CAMERA_WARMUP_SECONDS", 0))
|
|
|
|
|
|
|
|
|
|
# Specify OS
|
|
|
|
|
current_platform = get_system_info()
|
|
|
|
|
is_win10 = lambda: platform.system() == "Windows" and "10" in platform.version()
|
|
|
|
|
|
|
|
|
|
def is_win11():
|
|
|
|
|
return sys.getwindowsversion().build >= 22000
|
|
|
|
|
|
|
|
|
|
def is_win10():
|
|
|
|
|
try:
|
|
|
|
|
return platform.system() == "Windows" and "10" in platform.version() and not is_win11()
|
|
|
|
|
except:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Initialize PyAudio
|
|
|
|
|
p = pyaudio.PyAudio()
|
|
|
|
@ -72,6 +82,7 @@ class Device:
|
|
|
|
|
self.captured_images = []
|
|
|
|
|
self.audiosegments = []
|
|
|
|
|
self.server_url = ""
|
|
|
|
|
self.ctrl_pressed = False
|
|
|
|
|
|
|
|
|
|
def fetch_image_from_camera(self, camera_index=CAMERA_DEVICE_INDEX):
|
|
|
|
|
"""Captures an image from the specified camera device and saves it to a temporary file. Adds the image to the captured_images list."""
|
|
|
|
@ -256,23 +267,39 @@ class Device:
|
|
|
|
|
def on_press(self, key):
|
|
|
|
|
"""Detect spacebar press and Ctrl+C combination."""
|
|
|
|
|
self.pressed_keys.add(key) # Add the pressed key to the set
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if keyboard.Key.space in self.pressed_keys:
|
|
|
|
|
self.toggle_recording(True)
|
|
|
|
|
elif {keyboard.Key.ctrl, keyboard.KeyCode.from_char("c")} <= self.pressed_keys:
|
|
|
|
|
elif {keyboard.Key.ctrl, keyboard.KeyCode.from_char('c')} <= self.pressed_keys:
|
|
|
|
|
logger.info("Ctrl+C pressed. Exiting...")
|
|
|
|
|
kill_process_tree()
|
|
|
|
|
os._exit(0)
|
|
|
|
|
|
|
|
|
|
# Windows alternative to the above
|
|
|
|
|
if key == keyboard.Key.ctrl_l:
|
|
|
|
|
self.ctrl_pressed = True
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
if key.vk == 67 and self.ctrl_pressed:
|
|
|
|
|
logger.info("Ctrl+C pressed. Exiting...")
|
|
|
|
|
kill_process_tree()
|
|
|
|
|
os._exit(0)
|
|
|
|
|
# For non-character keys
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def on_release(self, key):
|
|
|
|
|
"""Detect spacebar release and 'c' key press for camera, and handle key release."""
|
|
|
|
|
self.pressed_keys.discard(
|
|
|
|
|
key
|
|
|
|
|
) # Remove the released key from the key press tracking set
|
|
|
|
|
self.pressed_keys.discard(key) # Remove the released key from the key press tracking set
|
|
|
|
|
|
|
|
|
|
if key == keyboard.Key.ctrl_l:
|
|
|
|
|
self.ctrl_pressed = False
|
|
|
|
|
if key == keyboard.Key.space:
|
|
|
|
|
self.toggle_recording(False)
|
|
|
|
|
elif CAMERA_ENABLED and key == keyboard.KeyCode.from_char("c"):
|
|
|
|
|
elif CAMERA_ENABLED and key == keyboard.KeyCode.from_char('c'):
|
|
|
|
|
self.fetch_image_from_camera()
|
|
|
|
|
|
|
|
|
|
async def message_sender(self, websocket):
|
|
|
|
@ -342,7 +369,7 @@ class Device:
|
|
|
|
|
code = message["content"]
|
|
|
|
|
result = interpreter.computer.run(language, code)
|
|
|
|
|
send_queue.put(result)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if is_win10():
|
|
|
|
|
logger.info("Windows 10 detected")
|
|
|
|
|
# Workaround for Windows 10 not latching to the websocket server.
|
|
|
|
|