From 3f6ba52a0e97f2c8477bb0fa2158f3a9c46daa25 Mon Sep 17 00:00:00 2001 From: Ben Xu Date: Mon, 30 Dec 2024 15:27:23 -0500 Subject: [PATCH] add meet flag and better error handling --- software/main.py | 140 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 112 insertions(+), 28 deletions(-) diff --git a/software/main.py b/software/main.py index d8437d5..428a2f7 100644 --- a/software/main.py +++ b/software/main.py @@ -2,6 +2,8 @@ from yaspin import yaspin spinner = yaspin() spinner.start() + +import sys import typer import ngrok import platform @@ -10,7 +12,6 @@ import os import importlib from source.server.server import start_server import subprocess -import webview import socket import json import segno @@ -20,8 +21,11 @@ from dotenv import load_dotenv import signal from source.server.livekit.worker import main as worker_main from source.server.livekit.multimodal import main as multimodal_main -import warnings + import requests +import webbrowser +from pathlib import Path +import shutil load_dotenv() @@ -29,6 +33,17 @@ system_type = platform.system() app = typer.Typer() +def check_pnpm(): + """Check if pnpm is installed.""" + if not shutil.which("pnpm"): + raise typer.BadParameter( + "pnpm is required to run the meet interface. Please install it first: https://pnpm.io/installation" + ) + + +ROOM_NAME = "my-room" +AGENT_NAME = "light" + @app.command() def run( server: str = typer.Option( @@ -77,6 +92,11 @@ def run( "--multimodal", help="Run the multimodal agent", ), + meet: bool = typer.Option( + False, + "--meet", + help="Run the web-based meeting interface locally", + ), ): threads = [] @@ -153,6 +173,7 @@ def run( debug ), ) + spinner.stop() print("Starting server...") server_thread.start() @@ -162,7 +183,29 @@ def run( ### LIVEKIT SERVER def run_command(command): - subprocess.run(command, shell=True, check=True) + while True: + process = subprocess.run(command, shell=True, check=True, preexec_fn=os.setsid) + + url = f"http://{server_host}:{server_port}" + while True: + time.sleep(5) + try: + response = requests.get(url) + if response.status_code == 200: + print("livekit server is running") + else: + print("request failed: ", response.status_code) + break + except requests.RequestException: + print("request exception") + break + + print("Server failed to start, retrying...") + try: + os.killpg(os.getpgid(process.pid), signal.SIGTERM) + except ProcessLookupError: + pass # Process group already terminated + # Start the livekit server if debug: @@ -179,6 +222,7 @@ def run( local_livekit_url = f"ws://{server_host}:{server_port}" if expose: + ### EXPOSE OVER INTERNET listener = ngrok.forward(f"{server_host}:{server_port}", authtoken_from_env=True, domain=domain) @@ -218,11 +262,21 @@ def run( # Signal handler for termination signals def signal_handler(sig, frame): print("Termination signal received. Shutting down...") - for thread in threads: - if thread.is_alive(): - # Kill subprocess associated with thread - subprocess.run(f"pkill -P {os.getpid()}", shell=True) - os._exit(0) + try: + # Kill all processes in our process group + os.killpg(os.getpid(), signal.SIGTERM) + + # Additional cleanup for any remaining threads + for thread in threads: + if thread.is_alive(): + try: + subprocess.run(f"pkill -P {os.getpid()}", shell=True) + except: + pass + except: + pass + finally: + os._exit(0) # Register signal handler for SIGINT and SIGTERM signal.signal(signal.SIGINT, signal_handler) @@ -235,6 +289,7 @@ def run( response = requests.get(url) status = "OK" if response.status_code == 200 else "Not OK" if status == "OK": + print("livekit server is running") break except requests.RequestException: pass @@ -247,7 +302,7 @@ def run( .with_name("You") \ .with_grants(api.VideoGrants( room_join=True, - room="my-room", + room=ROOM_NAME, )).to_jwt()) ### DISPLAY QR CODE @@ -261,6 +316,28 @@ def run( qr_thread = threading.Thread(target=display_qr_code) qr_thread.start() threads.append(qr_thread) + + + if meet: + check_pnpm() + # Get the path to the meet client directory + meet_client_path = Path(__file__).parent / "source" / "clients" / "meet" + + # Install dependencies if needed + spinner.text = "Installing meet client dependencies..." + subprocess.run(["pnpm", "install"], cwd=meet_client_path, check=True) + + # Start the Next.js dev server in a separate thread + def run_next_server(): + subprocess.run(["pnpm", "dev"], cwd=meet_client_path, check=True) + + next_server_thread = threading.Thread(target=run_next_server) + next_server_thread.daemon = True + next_server_thread.start() + threads.append(next_server_thread) + + # Wait for Next.js server to start + time.sleep(3) ### START LIVEKIT WORKER if server == "livekit": @@ -271,25 +348,32 @@ def run( os.environ['01_TTS'] = interpreter.tts os.environ['01_STT'] = interpreter.stt - # meet_url = f'http://localhost:3000/custom?liveKitUrl={url.replace("http", "ws")}&token={token}\n\n' - meet_url = f'https://meet.livekit.io/custom?liveKitUrl={url.replace("http", "ws")}&token={participant_token}\n\n' - print("\n") - print("For debugging, you can join a video call with your assistant. Click the link below, then send a chat message that says {CONTEXT_MODE_OFF}, then begin speaking:") - print(meet_url) - - for attempt in range(30): - try: - if multimodal: - multimodal_main(local_livekit_url) - else: - worker_main(local_livekit_url) - except KeyboardInterrupt: - print("Exiting.") - raise - except Exception as e: - print(f"Error occurred: {e}") - print("Retrying...") - time.sleep(1) + if debug and not meet: + meet_url = f'http://localhost:3000/custom?liveKitUrl={url.replace("http", "ws")}&token={participant_token}\n\n' + print("\n") + print("For debugging, you can join a video call with your assistant. Click the link below, then send a chat message that says {CONTEXT_MODE_OFF}, then begin speaking:") + print(meet_url) + + # Open the browser with the pre-configured URL + if meet: + meet_url = f'http://localhost:3000/custom?liveKitUrl={url.replace("http", "ws")}&token={participant_token}' + spinner.stop() + print(f"\nOpening meet interface at: {meet_url}") + webbrowser.open(meet_url) + + print(f"multimodal flag is: {multimodal}") + try: + if multimodal: + multimodal_main(url) + else: + print("Starting worker...") + worker_main(url) + + except KeyboardInterrupt: + print("Exiting.") + raise + except Exception as e: + print(f"Error occurred: {e}") # Wait for all threads to complete for thread in threads: