draft main cli

pull/314/head
Ben Xu 3 weeks ago
parent f989731c08
commit ab8055e0de

@ -1,77 +1,68 @@
from yaspin import yaspin
spinner = yaspin()
spinner.start()
import sys
import subprocess
import time
import os
import typer
import ngrok
import platform
import threading
import os
import importlib
from source.server.server import start_server
import subprocess
import socket
import json
import webbrowser
import psutil
import ngrok
import segno
import json
from pathlib import Path
from livekit import api
import time
from dotenv import load_dotenv
import signal
from source.server.livekit.worker import main as worker_main
from source.server.livekit.multimodal import main as multimodal_main
import requests
import webbrowser
from pathlib import Path
import shutil
from dotenv import load_dotenv
load_dotenv()
system_type = platform.system()
load_dotenv()
system_type = platform.system()
app = typer.Typer()
def check_pnpm():
"""Check if pnpm is installed."""
if not shutil.which("pnpm"):
raise typer.BadParameter(
"pnpm is required to run the meet interface. Please install it first: https://pnpm.io/installation"
)
ROOM_NAME = "my-room"
AGENT_NAME = "light"
def pre_clean_process(port):
"""Find and kill process running on specified port"""
for proc in psutil.process_iter(['pid', 'name', 'connections']):
try:
for conn in proc.connections():
if conn.laddr.port == port:
print(f"Killing process {proc.pid} ({proc.name()}) on port {port}")
proc.terminate()
proc.wait()
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return False
def cleanup_processes(processes):
for process in processes:
if process.poll() is None: # if process is still running
process.terminate()
process.wait() # wait for process to terminate
@app.command()
def run(
server: str = typer.Option(
None,
"--server",
help="Run server (accepts `livekit` or `light`)",
),
server_host: str = typer.Option(
host: str = typer.Option(
"0.0.0.0",
"--server-host",
help="Specify the server host where the server will deploy",
"--host",
help="Specify the server host where the livekit server will deploy. For other devices on your network to connect to it, keep it on default `0.0.0.0`",
),
server_port: int = typer.Option(
port: int = typer.Option(
10101,
"--server-port",
help="Specify the server port where the server will deploy",
),
expose: bool = typer.Option(False, "--expose", help="Expose server over the internet"),
domain: str = typer.Option(None, "--domain", help="Use `--expose` with a custom ngrok domain"),
client: str = typer.Option(None, "--client", help="Run client of a particular type. Accepts `light-python`, defaults to `light-python`"),
server_url: str = typer.Option(
None,
"--server-url",
help="Specify the server URL that the --client should expect. Defaults to server-host and server-port",
),
qr: bool = typer.Option(
False, "--qr", help="Display QR code containing the server connection information (will be ngrok url if `--expose` is used)"
"--port",
help="Specify the server port where the livekit server will deploy",
),
domain: str = typer.Option(None, "--domain", help="Pass in a custom ngrok domain to expose the livekit server over the internet"),
client: str = typer.Option(None, "--client", help="Run client of a particular type. Accepts `meet` or `mobile`, defaults to `meet`"),
profiles: bool = typer.Option(
False,
"--profiles",
@ -91,24 +82,12 @@ def run(
False,
"--multimodal",
help="Run the multimodal agent",
),
meet: bool = typer.Option(
False,
"--meet",
help="Run the web-based meeting interface locally",
),
):
threads = []
# Handle `01` with no arguments, which should start server + client
if not server and not client:
server = "light"
client = "light-python"
### PROFILES
profiles_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "source", "server", "profiles")
)
):
# preprocess ports
ports = [10101, 8000, 3000]
for port in ports:
pre_clean_process(port)
if profiles:
if platform.system() == "Windows":
@ -121,6 +100,8 @@ def run(
subprocess.Popen(['open', profiles_dir])
exit(0)
profiles_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "source", "server", "profiles")
if profile:
if not os.path.isfile(profile):
profile = os.path.join(profiles_dir, profile)
@ -130,262 +111,66 @@ def run(
print(f"Invalid profile path: {profile}")
exit(1)
# Load the profile module from the provided path
spec = importlib.util.spec_from_file_location("profile", profile)
profile_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(profile_module)
# Get the interpreter from the profile
interpreter = profile_module.interpreter
### SERVER
if system_type == "Windows":
server_host = "localhost"
if not server_url:
server_url = f"{server_host}:{server_port}"
if server:
### LIGHT SERVER (required by livekit)
if server == "light":
light_server_port = server_port
light_server_host = server_host
voice = True # The light server will support voice
elif server == "livekit":
# The light server should run at a different port if we want to run a livekit server
spinner.stop()
print(f"Starting light server (required for livekit server) on localhost, on the port before `--server-port` (port {server_port-1}), unless the `AN_OPEN_PORT` env var is set.")
print(f"The livekit server will be started on port {server_port}.")
light_server_port = os.getenv('AN_OPEN_PORT', server_port-1)
light_server_host = "localhost"
voice = False # The light server will NOT support voice. It will just run Open Interpreter. The Livekit server will handle voice
server_thread = threading.Thread(
target=start_server,
args=(
light_server_host,
light_server_port,
interpreter,
voice,
debug
),
)
spinner.stop()
print("Starting server...")
server_thread.start()
threads.append(server_thread)
if server == "livekit":
### LIVEKIT SERVER
def run_command(command):
i = 0
while True:
print("i is: ", i)
if i > 0:
process = subprocess.run(command, shell=True, check=True, preexec_fn=os.setsid)
else:
print("Skipping server start (first iteration)")
url = f"http://{server_host}:{server_port}"
while True:
time.sleep(5)
try:
print("Checking server status... with i = ", i)
response = requests.get(url)
if response.status_code == 200:
continue
else:
print("request failed: ", response.status_code)
break
except requests.RequestException:
print("request exception")
break
print("Server failed to start, retrying...")
try:
os.killpg(os.getpgid(process.pid), signal.SIGTERM) # This will fail when i=0
except ProcessLookupError:
pass
i += 1
# Start the livekit server
if debug:
command = f'livekit-server --dev --bind "{server_host}" --port {server_port}'
else:
command = f'livekit-server --dev --bind "{server_host}" --port {server_port} > /dev/null 2>&1'
livekit_thread = threading.Thread(
target=run_command, args=(command,)
)
livekit_thread.start()
threads.append(livekit_thread)
local_livekit_url = f"ws://{server_host}:{server_port}"
if expose:
### EXPOSE OVER INTERNET
listener = ngrok.forward(f"{server_host}:{server_port}", authtoken_from_env=True, domain=domain)
url = listener.url()
OI_CMD = f"interpreter --serve --profile {profile}"
oi_server = subprocess.Popen(OI_CMD, shell=True)
print("Interpreter server started")
else:
### GET LOCAL URL
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip_address = s.getsockname()[0]
s.close()
url = f"http://{ip_address}:{server_port}"
if server == "livekit":
print("Livekit server will run at:", url)
### CLIENT
if client:
module = importlib.import_module(
f".clients.{client}.client", package="source"
)
client_thread = threading.Thread(target=module.run, args=[server_url, debug])
spinner.stop()
print("Starting client...")
client_thread.start()
threads.append(client_thread)
### WAIT FOR THREADS TO FINISH, HANDLE CTRL-C
print("Starting livekit server...")
if debug:
LK_CMD = f"livekit-server --dev --bind {host} --port {port}"
else:
LK_CMD = f"livekit-server --dev --bind {host} --port {port} > /dev/null 2>&1"
lk_server = subprocess.Popen(LK_CMD, shell=True)
print("Livekit server started")
time.sleep(2)
# Signal handler for termination signals
def signal_handler(sig, frame):
print("Termination signal received. Shutting down...")
try:
# Kill all processes in our process group
os.killpg(os.getpid(), signal.SIGTERM)
# Additional cleanup for any remaining threads
for thread in threads:
if thread.is_alive():
try:
subprocess.run(f"pkill -P {os.getpid()}", shell=True)
except:
pass
except:
pass
finally:
os._exit(0)
# Register signal handler for SIGINT and SIGTERM
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
# Verify the server is running
for attempt in range(10):
try:
response = requests.get(url)
status = "OK" if response.status_code == 200 else "Not OK"
if status == "OK":
print("livekit server is running")
break
except requests.RequestException:
pass
time.sleep(1)
else:
raise Exception(f"Server at {url} failed to respond after 10 attempts")
participant_token = str(api.AccessToken('devkey', 'secret') \
lk_url = f"http://localhost:10101"
participant_token = str(api.AccessToken('devkey', 'secret') \
.with_identity("Participant") \
.with_name("You") \
.with_grants(api.VideoGrants(
room_join=True,
room=ROOM_NAME,))
.to_jwt())
### DISPLAY QR CODE
if qr:
def display_qr_code():
time.sleep(10)
content = json.dumps({"livekit_server": url, "token": participant_token})
qr_code = segno.make(content)
qr_code.terminal(compact=True)
qr_thread = threading.Thread(target=display_qr_code)
qr_thread.start()
threads.append(qr_thread)
if meet:
check_pnpm()
# Get the path to the meet client directory
meet_client_path = Path(__file__).parent / "source" / "clients" / "meet"
# Install dependencies if needed
spinner.text = "Installing meet client dependencies..."
subprocess.run(["pnpm", "install"], cwd=meet_client_path, check=True)
# Start the Next.js dev server in a separate thread
def run_next_server():
subprocess.run(["pnpm", "dev"], cwd=meet_client_path, check=True)
next_server_thread = threading.Thread(target=run_next_server)
next_server_thread.daemon = True
next_server_thread.start()
threads.append(next_server_thread)
# Wait for Next.js server to start
time.sleep(3)
processes = [lk_server, oi_server]
### START LIVEKIT WORKER
if server == "livekit":
time.sleep(1)
# These are needed to communicate with the worker's entrypoint
os.environ['INTERPRETER_SERVER_HOST'] = light_server_host
os.environ['INTERPRETER_SERVER_PORT'] = str(light_server_port)
os.environ['01_TTS'] = interpreter.tts
os.environ['01_STT'] = interpreter.stt
if client == 'mobile':
listener = ngrok.forward(f"{host}:{port}", authtoken_from_env=True, domain=domain)
lk_url = listener.url()
print(f"Livekit server forwarded to: {lk_url}")
if debug and not meet:
meet_url = f'http://localhost:3000/custom?liveKitUrl={url.replace("http", "ws")}&token={participant_token}\n\n'
print("\n")
print("For debugging, you can join a video call with your assistant. Click the link below, then send a chat message that says {CONTEXT_MODE_OFF}, then begin speaking:")
print(meet_url)
print("Scan the QR code below with your mobile app to connect to the livekit server.")
content = json.dumps({"livekit_server": lk_url, "token": participant_token})
qr_code = segno.make(content)
qr_code.terminal(compact=True)
else: # meet client
# Get the path to the meet client directory
meet_client_path = Path(__file__).parent / "source" / "clients" / "meet"
# Open the browser with the pre-configured URL
if meet:
meet_url = f'http://localhost:3000/custom?liveKitUrl={url.replace("http", "ws")}&token={participant_token}'
spinner.stop()
print(f"\nOpening meet interface at: {meet_url}")
webbrowser.open(meet_url)
try:
if multimodal:
multimodal_main(url)
else:
print("Starting worker...")
worker_main(url)
print("Starting Next.js dev server...")
next_server = subprocess.Popen(["pnpm", "dev"], cwd=meet_client_path,)
print("Next.js dev server started")
except KeyboardInterrupt:
print("Exiting.")
raise
except Exception as e:
print(f"Error occurred: {e}")
time.sleep(2)
meet_url = f'http://localhost:3001/custom?liveKitUrl={lk_url.replace("http", "ws")}&token={participant_token}'
print(f"\nOpening meet interface at: {meet_url}")
webbrowser.open(meet_url)
# Wait for all threads to complete
for thread in threads:
thread.join()
processes.append(next_server)
try:
print("Starting worker...")
if multimodal:
multimodal_main(lk_url)
else:
worker_main(lk_url)
print("Worker started")
except KeyboardInterrupt:
# On KeyboardInterrupt, send SIGINT to self
os.kill(os.getpid(), signal.SIGINT)
print("\nReceived interrupt signal, shutting down...")
finally:
print("Cleaning up processes...")
cleanup_processes(processes)

@ -0,0 +1,136 @@
import subprocess
import time
import os
import typer
import platform
import webbrowser
import psutil
import ngrok
import segno
import json
from pathlib import Path
from livekit import api
from source.server.livekit.worker import main as worker_main
from source.server.livekit.multimodal import main as multimodal_main
from dotenv import load_dotenv
load_dotenv()
system_type = platform.system()
app = typer.Typer()
ROOM_NAME = "my-room"
def pre_clean_process(port):
"""Find and kill process running on specified port"""
for proc in psutil.process_iter(['pid', 'name', 'connections']):
try:
for conn in proc.connections():
if conn.laddr.port == port:
print(f"Killing process {proc.pid} ({proc.name()}) on port {port}")
proc.terminate()
proc.wait()
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return False
def cleanup_processes(processes):
for process in processes:
if process.poll() is None: # if process is still running
process.terminate()
process.wait() # wait for process to terminate
if __name__ == "__main__":
# preprocess ports
ports = [10101, 8000, 3000]
for port in ports:
pre_clean_process(port)
profiles_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "source", "server", "profiles")
profile = "default.py"
if profile:
if not os.path.isfile(profile):
profile = os.path.join(profiles_dir, profile)
if not os.path.isfile(profile):
profile += ".py"
if not os.path.isfile(profile):
print(f"Invalid profile path: {profile}")
exit(1)
OI_CMD = f"interpreter --serve --profile {profile}"
oi_server = subprocess.Popen(OI_CMD, shell=True)
print("Interpreter server started")
print("Starting livekit server...")
debug = False
host = '0.0.0.0'
port = 10101
if debug:
LK_CMD = f"livekit-server --dev --bind {host} --port {port}"
else:
LK_CMD = f"livekit-server --dev --bind {host} --port {port} > /dev/null 2>&1"
lk_server = subprocess.Popen(LK_CMD, shell=True)
print("Livekit server started")
lk_url = f"http://localhost:10101"
participant_token = str(api.AccessToken('devkey', 'secret') \
.with_identity("Participant") \
.with_name("You") \
.with_grants(api.VideoGrants(
room_join=True,
room=ROOM_NAME,))
.to_jwt())
processes = [lk_server, oi_server]
client = 'meet'
if client == 'mobile':
listener = ngrok.forward(f"{host}:{port}", authtoken_from_env=True)
lk_url = listener.url()
print(f"Livekit server forwarded to: {lk_url}")
print("Scan the QR code below with your mobile app to connect to the livekit server.")
content = json.dumps({"livekit_server": lk_url, "token": participant_token})
qr_code = segno.make(content)
qr_code.terminal(compact=True)
else: # meet client
# Get the path to the meet client directory
meet_client_path = Path(__file__).parent / "source" / "clients" / "meet"
print("Starting Next.js dev server...")
next_server = subprocess.Popen(["pnpm", "dev"], cwd=meet_client_path,)
print("Next.js dev server started")
time.sleep(2)
meet_url = f'http://localhost:3000/custom?liveKitUrl={lk_url.replace("http", "ws")}&token={participant_token}'
print(f"\nOpening meet interface at: {meet_url}")
webbrowser.open(meet_url)
processes.append(next_server)
multimodal = False
try:
print("Starting worker...")
if multimodal:
multimodal_main(lk_url)
else:
worker_main(lk_url)
print("Worker started")
except KeyboardInterrupt:
print("\nReceived interrupt signal, shutting down...")
finally:
print("Cleaning up processes...")
cleanup_processes(processes)
Loading…
Cancel
Save