From ab8055e0de024e8b669693e01ce3527e0ce5b684 Mon Sep 17 00:00:00 2001 From: Ben Xu Date: Wed, 1 Jan 2025 03:51:46 -0500 Subject: [PATCH] draft main cli --- software/main.py | 411 +++++++++++------------------------------------ software/test.py | 136 ++++++++++++++++ 2 files changed, 234 insertions(+), 313 deletions(-) create mode 100644 software/test.py diff --git a/software/main.py b/software/main.py index c635afd..2f794dc 100644 --- a/software/main.py +++ b/software/main.py @@ -1,77 +1,68 @@ -from yaspin import yaspin -spinner = yaspin() -spinner.start() - - -import sys +import subprocess +import time +import os import typer -import ngrok import platform -import threading -import os -import importlib -from source.server.server import start_server -import subprocess -import socket -import json +import webbrowser +import psutil +import ngrok import segno +import json + +from pathlib import Path from livekit import api -import time -from dotenv import load_dotenv -import signal from source.server.livekit.worker import main as worker_main from source.server.livekit.multimodal import main as multimodal_main -import requests -import webbrowser -from pathlib import Path -import shutil +from dotenv import load_dotenv -load_dotenv() -system_type = platform.system() +load_dotenv() +system_type = platform.system() app = typer.Typer() -def check_pnpm(): - """Check if pnpm is installed.""" - if not shutil.which("pnpm"): - raise typer.BadParameter( - "pnpm is required to run the meet interface. Please install it first: https://pnpm.io/installation" - ) ROOM_NAME = "my-room" -AGENT_NAME = "light" + + +def pre_clean_process(port): + """Find and kill process running on specified port""" + for proc in psutil.process_iter(['pid', 'name', 'connections']): + try: + for conn in proc.connections(): + if conn.laddr.port == port: + print(f"Killing process {proc.pid} ({proc.name()}) on port {port}") + proc.terminate() + proc.wait() + return True + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + return False + + +def cleanup_processes(processes): + for process in processes: + if process.poll() is None: # if process is still running + process.terminate() + process.wait() # wait for process to terminate + @app.command() def run( - server: str = typer.Option( - None, - "--server", - help="Run server (accepts `livekit` or `light`)", - ), - server_host: str = typer.Option( + host: str = typer.Option( "0.0.0.0", - "--server-host", - help="Specify the server host where the server will deploy", + "--host", + help="Specify the server host where the livekit server will deploy. For other devices on your network to connect to it, keep it on default `0.0.0.0`", ), - server_port: int = typer.Option( + port: int = typer.Option( 10101, - "--server-port", - help="Specify the server port where the server will deploy", - ), - expose: bool = typer.Option(False, "--expose", help="Expose server over the internet"), - domain: str = typer.Option(None, "--domain", help="Use `--expose` with a custom ngrok domain"), - client: str = typer.Option(None, "--client", help="Run client of a particular type. Accepts `light-python`, defaults to `light-python`"), - server_url: str = typer.Option( - None, - "--server-url", - help="Specify the server URL that the --client should expect. Defaults to server-host and server-port", - ), - qr: bool = typer.Option( - False, "--qr", help="Display QR code containing the server connection information (will be ngrok url if `--expose` is used)" + "--port", + help="Specify the server port where the livekit server will deploy", ), + domain: str = typer.Option(None, "--domain", help="Pass in a custom ngrok domain to expose the livekit server over the internet"), + client: str = typer.Option(None, "--client", help="Run client of a particular type. Accepts `meet` or `mobile`, defaults to `meet`"), profiles: bool = typer.Option( False, "--profiles", @@ -91,24 +82,12 @@ def run( False, "--multimodal", help="Run the multimodal agent", - ), - meet: bool = typer.Option( - False, - "--meet", - help="Run the web-based meeting interface locally", - ), -): - - threads = [] - - # Handle `01` with no arguments, which should start server + client - if not server and not client: - server = "light" - client = "light-python" - - ### PROFILES - - profiles_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "source", "server", "profiles") + ) +): + # preprocess ports + ports = [10101, 8000, 3000] + for port in ports: + pre_clean_process(port) if profiles: if platform.system() == "Windows": @@ -121,6 +100,8 @@ def run( subprocess.Popen(['open', profiles_dir]) exit(0) + profiles_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "source", "server", "profiles") + if profile: if not os.path.isfile(profile): profile = os.path.join(profiles_dir, profile) @@ -130,262 +111,66 @@ def run( print(f"Invalid profile path: {profile}") exit(1) - # Load the profile module from the provided path - spec = importlib.util.spec_from_file_location("profile", profile) - profile_module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(profile_module) - - # Get the interpreter from the profile - interpreter = profile_module.interpreter - - ### SERVER - - if system_type == "Windows": - server_host = "localhost" - - if not server_url: - server_url = f"{server_host}:{server_port}" - - if server: - - ### LIGHT SERVER (required by livekit) - - if server == "light": - light_server_port = server_port - light_server_host = server_host - voice = True # The light server will support voice - elif server == "livekit": - # The light server should run at a different port if we want to run a livekit server - spinner.stop() - print(f"Starting light server (required for livekit server) on localhost, on the port before `--server-port` (port {server_port-1}), unless the `AN_OPEN_PORT` env var is set.") - print(f"The livekit server will be started on port {server_port}.") - light_server_port = os.getenv('AN_OPEN_PORT', server_port-1) - light_server_host = "localhost" - voice = False # The light server will NOT support voice. It will just run Open Interpreter. The Livekit server will handle voice - - server_thread = threading.Thread( - target=start_server, - args=( - light_server_host, - light_server_port, - interpreter, - voice, - debug - ), - ) - - spinner.stop() - print("Starting server...") - server_thread.start() - threads.append(server_thread) - - if server == "livekit": - - ### LIVEKIT SERVER - def run_command(command): - i = 0 - while True: - print("i is: ", i) - if i > 0: - process = subprocess.run(command, shell=True, check=True, preexec_fn=os.setsid) - else: - print("Skipping server start (first iteration)") - - url = f"http://{server_host}:{server_port}" - while True: - time.sleep(5) - try: - print("Checking server status... with i = ", i) - response = requests.get(url) - if response.status_code == 200: - continue - else: - print("request failed: ", response.status_code) - break - except requests.RequestException: - print("request exception") - break - - print("Server failed to start, retrying...") - - try: - os.killpg(os.getpgid(process.pid), signal.SIGTERM) # This will fail when i=0 - except ProcessLookupError: - pass - - i += 1 - - - # Start the livekit server - if debug: - command = f'livekit-server --dev --bind "{server_host}" --port {server_port}' - else: - command = f'livekit-server --dev --bind "{server_host}" --port {server_port} > /dev/null 2>&1' - livekit_thread = threading.Thread( - target=run_command, args=(command,) - ) - - livekit_thread.start() - threads.append(livekit_thread) - - local_livekit_url = f"ws://{server_host}:{server_port}" - - if expose: - - ### EXPOSE OVER INTERNET - listener = ngrok.forward(f"{server_host}:{server_port}", authtoken_from_env=True, domain=domain) - url = listener.url() + OI_CMD = f"interpreter --serve --profile {profile}" + oi_server = subprocess.Popen(OI_CMD, shell=True) + print("Interpreter server started") + - else: - - ### GET LOCAL URL - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(("8.8.8.8", 80)) - ip_address = s.getsockname()[0] - s.close() - url = f"http://{ip_address}:{server_port}" - - - if server == "livekit": - print("Livekit server will run at:", url) - - - ### CLIENT - - if client: - - module = importlib.import_module( - f".clients.{client}.client", package="source" - ) - - client_thread = threading.Thread(target=module.run, args=[server_url, debug]) - spinner.stop() - print("Starting client...") - client_thread.start() - threads.append(client_thread) - - - ### WAIT FOR THREADS TO FINISH, HANDLE CTRL-C + print("Starting livekit server...") + if debug: + LK_CMD = f"livekit-server --dev --bind {host} --port {port}" + else: + LK_CMD = f"livekit-server --dev --bind {host} --port {port} > /dev/null 2>&1" + + lk_server = subprocess.Popen(LK_CMD, shell=True) + print("Livekit server started") + time.sleep(2) - # Signal handler for termination signals - def signal_handler(sig, frame): - print("Termination signal received. Shutting down...") - try: - # Kill all processes in our process group - os.killpg(os.getpid(), signal.SIGTERM) - - # Additional cleanup for any remaining threads - for thread in threads: - if thread.is_alive(): - try: - subprocess.run(f"pkill -P {os.getpid()}", shell=True) - except: - pass - except: - pass - finally: - os._exit(0) - - # Register signal handler for SIGINT and SIGTERM - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - try: - # Verify the server is running - for attempt in range(10): - try: - response = requests.get(url) - status = "OK" if response.status_code == 200 else "Not OK" - if status == "OK": - print("livekit server is running") - break - except requests.RequestException: - pass - time.sleep(1) - else: - raise Exception(f"Server at {url} failed to respond after 10 attempts") - - participant_token = str(api.AccessToken('devkey', 'secret') \ + lk_url = f"http://localhost:10101" + participant_token = str(api.AccessToken('devkey', 'secret') \ .with_identity("Participant") \ .with_name("You") \ .with_grants(api.VideoGrants( room_join=True, room=ROOM_NAME,)) .to_jwt()) - - ### DISPLAY QR CODE - if qr: - def display_qr_code(): - time.sleep(10) - content = json.dumps({"livekit_server": url, "token": participant_token}) - qr_code = segno.make(content) - qr_code.terminal(compact=True) - - qr_thread = threading.Thread(target=display_qr_code) - qr_thread.start() - threads.append(qr_thread) - - if meet: - check_pnpm() - # Get the path to the meet client directory - meet_client_path = Path(__file__).parent / "source" / "clients" / "meet" - - # Install dependencies if needed - spinner.text = "Installing meet client dependencies..." - subprocess.run(["pnpm", "install"], cwd=meet_client_path, check=True) - - # Start the Next.js dev server in a separate thread - def run_next_server(): - subprocess.run(["pnpm", "dev"], cwd=meet_client_path, check=True) - - next_server_thread = threading.Thread(target=run_next_server) - next_server_thread.daemon = True - next_server_thread.start() - threads.append(next_server_thread) - - # Wait for Next.js server to start - time.sleep(3) + processes = [lk_server, oi_server] - ### START LIVEKIT WORKER - if server == "livekit": - time.sleep(1) - # These are needed to communicate with the worker's entrypoint - os.environ['INTERPRETER_SERVER_HOST'] = light_server_host - os.environ['INTERPRETER_SERVER_PORT'] = str(light_server_port) - os.environ['01_TTS'] = interpreter.tts - os.environ['01_STT'] = interpreter.stt + if client == 'mobile': + listener = ngrok.forward(f"{host}:{port}", authtoken_from_env=True, domain=domain) + lk_url = listener.url() + print(f"Livekit server forwarded to: {lk_url}") - if debug and not meet: - meet_url = f'http://localhost:3000/custom?liveKitUrl={url.replace("http", "ws")}&token={participant_token}\n\n' - print("\n") - print("For debugging, you can join a video call with your assistant. Click the link below, then send a chat message that says {CONTEXT_MODE_OFF}, then begin speaking:") - print(meet_url) + print("Scan the QR code below with your mobile app to connect to the livekit server.") + content = json.dumps({"livekit_server": lk_url, "token": participant_token}) + qr_code = segno.make(content) + qr_code.terminal(compact=True) + else: # meet client + # Get the path to the meet client directory + meet_client_path = Path(__file__).parent / "source" / "clients" / "meet" - # Open the browser with the pre-configured URL - if meet: - meet_url = f'http://localhost:3000/custom?liveKitUrl={url.replace("http", "ws")}&token={participant_token}' - spinner.stop() - print(f"\nOpening meet interface at: {meet_url}") - webbrowser.open(meet_url) - - try: - if multimodal: - multimodal_main(url) - else: - print("Starting worker...") - worker_main(url) + print("Starting Next.js dev server...") + next_server = subprocess.Popen(["pnpm", "dev"], cwd=meet_client_path,) + print("Next.js dev server started") - except KeyboardInterrupt: - print("Exiting.") - raise - except Exception as e: - print(f"Error occurred: {e}") + time.sleep(2) + meet_url = f'http://localhost:3001/custom?liveKitUrl={lk_url.replace("http", "ws")}&token={participant_token}' + print(f"\nOpening meet interface at: {meet_url}") + webbrowser.open(meet_url) - # Wait for all threads to complete - for thread in threads: - thread.join() + processes.append(next_server) + + try: + print("Starting worker...") + if multimodal: + multimodal_main(lk_url) + else: + worker_main(lk_url) + print("Worker started") except KeyboardInterrupt: - # On KeyboardInterrupt, send SIGINT to self - os.kill(os.getpid(), signal.SIGINT) \ No newline at end of file + print("\nReceived interrupt signal, shutting down...") + finally: + print("Cleaning up processes...") + cleanup_processes(processes) \ No newline at end of file diff --git a/software/test.py b/software/test.py new file mode 100644 index 0000000..f6d7d8d --- /dev/null +++ b/software/test.py @@ -0,0 +1,136 @@ +import subprocess +import time +import os +import typer +import platform +import webbrowser +import psutil +import ngrok +import segno +import json + +from pathlib import Path +from livekit import api +from source.server.livekit.worker import main as worker_main +from source.server.livekit.multimodal import main as multimodal_main + +from dotenv import load_dotenv + + + +load_dotenv() +system_type = platform.system() +app = typer.Typer() + + + +ROOM_NAME = "my-room" + + +def pre_clean_process(port): + """Find and kill process running on specified port""" + for proc in psutil.process_iter(['pid', 'name', 'connections']): + try: + for conn in proc.connections(): + if conn.laddr.port == port: + print(f"Killing process {proc.pid} ({proc.name()}) on port {port}") + proc.terminate() + proc.wait() + return True + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + return False + + +def cleanup_processes(processes): + for process in processes: + if process.poll() is None: # if process is still running + process.terminate() + process.wait() # wait for process to terminate + +if __name__ == "__main__": + # preprocess ports + ports = [10101, 8000, 3000] + for port in ports: + pre_clean_process(port) + + + profiles_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "source", "server", "profiles") + + profile = "default.py" + if profile: + if not os.path.isfile(profile): + profile = os.path.join(profiles_dir, profile) + if not os.path.isfile(profile): + profile += ".py" + if not os.path.isfile(profile): + print(f"Invalid profile path: {profile}") + exit(1) + + + OI_CMD = f"interpreter --serve --profile {profile}" + oi_server = subprocess.Popen(OI_CMD, shell=True) + print("Interpreter server started") + + + print("Starting livekit server...") + debug = False + host = '0.0.0.0' + port = 10101 + if debug: + LK_CMD = f"livekit-server --dev --bind {host} --port {port}" + else: + LK_CMD = f"livekit-server --dev --bind {host} --port {port} > /dev/null 2>&1" + + lk_server = subprocess.Popen(LK_CMD, shell=True) + print("Livekit server started") + + lk_url = f"http://localhost:10101" + participant_token = str(api.AccessToken('devkey', 'secret') \ + .with_identity("Participant") \ + .with_name("You") \ + .with_grants(api.VideoGrants( + room_join=True, + room=ROOM_NAME,)) + .to_jwt()) + + processes = [lk_server, oi_server] + + client = 'meet' + if client == 'mobile': + listener = ngrok.forward(f"{host}:{port}", authtoken_from_env=True) + lk_url = listener.url() + print(f"Livekit server forwarded to: {lk_url}") + + print("Scan the QR code below with your mobile app to connect to the livekit server.") + content = json.dumps({"livekit_server": lk_url, "token": participant_token}) + qr_code = segno.make(content) + qr_code.terminal(compact=True) + else: # meet client + # Get the path to the meet client directory + meet_client_path = Path(__file__).parent / "source" / "clients" / "meet" + + print("Starting Next.js dev server...") + next_server = subprocess.Popen(["pnpm", "dev"], cwd=meet_client_path,) + print("Next.js dev server started") + + time.sleep(2) + meet_url = f'http://localhost:3000/custom?liveKitUrl={lk_url.replace("http", "ws")}&token={participant_token}' + print(f"\nOpening meet interface at: {meet_url}") + webbrowser.open(meet_url) + + processes.append(next_server) + + multimodal = False + try: + print("Starting worker...") + if multimodal: + multimodal_main(lk_url) + else: + worker_main(lk_url) + print("Worker started") + except KeyboardInterrupt: + print("\nReceived interrupt signal, shutting down...") + finally: + print("Cleaning up processes...") + cleanup_processes(processes) \ No newline at end of file