from yaspin import yaspin
spinner = yaspin ( )
spinner . start ( )
import sys
import typer
import ngrok
import platform
import threading
import os
import importlib
from source . server . server import start_server
import subprocess
import socket
import json
import segno
from livekit import api
import time
from dotenv import load_dotenv
import signal
from source . server . livekit . worker import main as worker_main
from source . server . livekit . multimodal import main as multimodal_main
import requests
import webbrowser
from pathlib import Path
import shutil
load_dotenv ( )
system_type = platform . system ( )
app = typer . Typer ( )
def check_pnpm ( ) :
""" Check if pnpm is installed. """
if not shutil . which ( " pnpm " ) :
raise typer . BadParameter (
" pnpm is required to run the meet interface. Please install it first: https://pnpm.io/installation "
)
ROOM_NAME = " my-room "
AGENT_NAME = " light "
@app.command ( )
def run (
server : str = typer . Option (
None ,
" --server " ,
help = " Run server (accepts `livekit` or `light`) " ,
) ,
server_host : str = typer . Option (
" 0.0.0.0 " ,
" --server-host " ,
help = " Specify the server host where the server will deploy " ,
) ,
server_port : int = typer . Option (
10101 ,
" --server-port " ,
help = " Specify the server port where the server will deploy " ,
) ,
expose : bool = typer . Option ( False , " --expose " , help = " Expose server over the internet " ) ,
domain : str = typer . Option ( None , " --domain " , help = " Use `--expose` with a custom ngrok domain " ) ,
client : str = typer . Option ( None , " --client " , help = " Run client of a particular type. Accepts `light-python`, defaults to `light-python` " ) ,
server_url : str = typer . Option (
None ,
" --server-url " ,
help = " Specify the server URL that the --client should expect. Defaults to server-host and server-port " ,
) ,
qr : bool = typer . Option (
False , " --qr " , help = " Display QR code containing the server connection information (will be ngrok url if `--expose` is used) "
) ,
profiles : bool = typer . Option (
False ,
" --profiles " ,
help = " Opens the folder where profiles are contained " ,
) ,
profile : str = typer . Option (
" default.py " ,
" --profile " ,
help = " Specify the path to the profile, or the name of the file if it ' s in the `profiles` directory (run `--profiles` to open the profiles directory) " ,
) ,
debug : bool = typer . Option (
False ,
" --debug " ,
help = " Print latency measurements and save microphone recordings locally for manual playback " ,
) ,
multimodal : bool = typer . Option (
False ,
" --multimodal " ,
help = " Run the multimodal agent " ,
) ,
meet : bool = typer . Option (
False ,
" --meet " ,
help = " Run the web-based meeting interface locally " ,
) ,
) :
threads = [ ]
# Handle `01` with no arguments, which should start server + client
if not server and not client :
server = " light "
client = " light-python "
### PROFILES
profiles_dir = os . path . join ( os . path . dirname ( os . path . realpath ( __file__ ) ) , " source " , " server " , " profiles " )
if profiles :
if platform . system ( ) == " Windows " :
subprocess . Popen ( [ ' explorer ' , profiles_dir ] )
elif platform . system ( ) == " Darwin " :
subprocess . Popen ( [ ' open ' , profiles_dir ] )
elif platform . system ( ) == " Linux " :
subprocess . Popen ( [ ' xdg-open ' , profiles_dir ] )
else :
subprocess . Popen ( [ ' open ' , profiles_dir ] )
exit ( 0 )
if profile :
if not os . path . isfile ( profile ) :
profile = os . path . join ( profiles_dir , profile )
if not os . path . isfile ( profile ) :
profile + = " .py "
if not os . path . isfile ( profile ) :
print ( f " Invalid profile path: { profile } " )
exit ( 1 )
# Load the profile module from the provided path
spec = importlib . util . spec_from_file_location ( " profile " , profile )
profile_module = importlib . util . module_from_spec ( spec )
spec . loader . exec_module ( profile_module )
# Get the interpreter from the profile
interpreter = profile_module . interpreter
### SERVER
if system_type == " Windows " :
server_host = " localhost "
if not server_url :
server_url = f " { server_host } : { server_port } "
if server :
### LIGHT SERVER (required by livekit)
if server == " light " :
light_server_port = server_port
light_server_host = server_host
voice = True # The light server will support voice
elif server == " livekit " :
# The light server should run at a different port if we want to run a livekit server
spinner . stop ( )
print ( f " Starting light server (required for livekit server) on localhost, on the port before `--server-port` (port { server_port - 1 } ), unless the `AN_OPEN_PORT` env var is set. " )
print ( f " The livekit server will be started on port { server_port } . " )
light_server_port = os . getenv ( ' AN_OPEN_PORT ' , server_port - 1 )
light_server_host = " localhost "
voice = False # The light server will NOT support voice. It will just run Open Interpreter. The Livekit server will handle voice
server_thread = threading . Thread (
target = start_server ,
args = (
light_server_host ,
light_server_port ,
interpreter ,
voice ,
debug
) ,
)
spinner . stop ( )
print ( " Starting server... " )
server_thread . start ( )
threads . append ( server_thread )
if server == " livekit " :
### LIVEKIT SERVER
def run_command ( command ) :
i = 0
while True :
print ( " i is: " , i )
if i > 0 :
process = subprocess . run ( command , shell = True , check = True , preexec_fn = os . setsid )
else :
print ( " Skipping server start (first iteration) " )
url = f " http:// { server_host } : { server_port } "
while True :
time . sleep ( 5 )
try :
print ( " Checking server status... with i = " , i )
response = requests . get ( url )
if response . status_code == 200 :
continue
else :
print ( " request failed: " , response . status_code )
break
except requests . RequestException :
print ( " request exception " )
break
print ( " Server failed to start, retrying... " )
try :
os . killpg ( os . getpgid ( process . pid ) , signal . SIGTERM ) # This will fail when i=0
except ProcessLookupError :
pass
i + = 1
# Start the livekit server
if debug :
command = f ' livekit-server --dev --bind " { server_host } " --port { server_port } '
else :
command = f ' livekit-server --dev --bind " { server_host } " --port { server_port } > /dev/null 2>&1 '
livekit_thread = threading . Thread (
target = run_command , args = ( command , )
)
livekit_thread . start ( )
threads . append ( livekit_thread )
local_livekit_url = f " ws:// { server_host } : { server_port } "
if expose :
### EXPOSE OVER INTERNET
listener = ngrok . forward ( f " { server_host } : { server_port } " , authtoken_from_env = True , domain = domain )
url = listener . url ( )
else :
### GET LOCAL URL
s = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM )
s . connect ( ( " 8.8.8.8 " , 80 ) )
ip_address = s . getsockname ( ) [ 0 ]
s . close ( )
url = f " http:// { ip_address } : { server_port } "
if server == " livekit " :
print ( " Livekit server will run at: " , url )
### CLIENT
if client :
module = importlib . import_module (
f " .clients. { client } .client " , package = " source "
)
client_thread = threading . Thread ( target = module . run , args = [ server_url , debug ] )
spinner . stop ( )
print ( " Starting client... " )
client_thread . start ( )
threads . append ( client_thread )
### WAIT FOR THREADS TO FINISH, HANDLE CTRL-C
# Signal handler for termination signals
def signal_handler ( sig , frame ) :
print ( " Termination signal received. Shutting down... " )
try :
# Kill all processes in our process group
os . killpg ( os . getpid ( ) , signal . SIGTERM )
# Additional cleanup for any remaining threads
for thread in threads :
if thread . is_alive ( ) :
try :
subprocess . run ( f " pkill -P { os . getpid ( ) } " , shell = True )
except :
pass
except :
pass
finally :
os . _exit ( 0 )
# Register signal handler for SIGINT and SIGTERM
signal . signal ( signal . SIGINT , signal_handler )
signal . signal ( signal . SIGTERM , signal_handler )
try :
# Verify the server is running
for attempt in range ( 10 ) :
try :
response = requests . get ( url )
status = " OK " if response . status_code == 200 else " Not OK "
if status == " OK " :
print ( " livekit server is running " )
break
except requests . RequestException :
pass
time . sleep ( 1 )
else :
raise Exception ( f " Server at { url } failed to respond after 10 attempts " )
participant_token = str ( api . AccessToken ( ' devkey ' , ' secret ' ) \
. with_identity ( " Participant " ) \
. with_name ( " You " ) \
. with_grants ( api . VideoGrants (
room_join = True ,
room = ROOM_NAME , ) )
. to_jwt ( ) )
### DISPLAY QR CODE
if qr :
def display_qr_code ( ) :
time . sleep ( 10 )
content = json . dumps ( { " livekit_server " : url , " token " : participant_token } )
qr_code = segno . make ( content )
qr_code . terminal ( compact = True )
qr_thread = threading . Thread ( target = display_qr_code )
qr_thread . start ( )
threads . append ( qr_thread )
if meet :
check_pnpm ( )
# Get the path to the meet client directory
meet_client_path = Path ( __file__ ) . parent / " source " / " clients " / " meet "
# Install dependencies if needed
spinner . text = " Installing meet client dependencies... "
subprocess . run ( [ " pnpm " , " install " ] , cwd = meet_client_path , check = True )
# Start the Next.js dev server in a separate thread
def run_next_server ( ) :
subprocess . run ( [ " pnpm " , " dev " ] , cwd = meet_client_path , check = True )
next_server_thread = threading . Thread ( target = run_next_server )
next_server_thread . daemon = True
next_server_thread . start ( )
threads . append ( next_server_thread )
# Wait for Next.js server to start
time . sleep ( 3 )
### START LIVEKIT WORKER
if server == " livekit " :
time . sleep ( 1 )
# These are needed to communicate with the worker's entrypoint
os . environ [ ' INTERPRETER_SERVER_HOST ' ] = light_server_host
os . environ [ ' INTERPRETER_SERVER_PORT ' ] = str ( light_server_port )
os . environ [ ' 01_TTS ' ] = interpreter . tts
os . environ [ ' 01_STT ' ] = interpreter . stt
if debug and not meet :
meet_url = f ' http://localhost:3000/custom?liveKitUrl= { url . replace ( " http " , " ws " ) } &token= { participant_token } \n \n '
print ( " \n " )
print ( " For debugging, you can join a video call with your assistant. Click the link below, then send a chat message that says {CONTEXT_MODE_OFF} , then begin speaking: " )
print ( meet_url )
# Open the browser with the pre-configured URL
if meet :
meet_url = f ' http://localhost:3000/custom?liveKitUrl= { url . replace ( " http " , " ws " ) } &token= { participant_token } '
spinner . stop ( )
print ( f " \n Opening meet interface at: { meet_url } " )
webbrowser . open ( meet_url )
try :
if multimodal :
multimodal_main ( url )
else :
print ( " Starting worker... " )
worker_main ( url )
except KeyboardInterrupt :
print ( " Exiting. " )
raise
except Exception as e :
print ( f " Error occurred: { e } " )
# Wait for all threads to complete
for thread in threads :
thread . join ( )
except KeyboardInterrupt :
# On KeyboardInterrupt, send SIGINT to self
os . kill ( os . getpid ( ) , signal . SIGINT )