diff --git a/software/source/server/tunnel.py b/software/source/server/tunnel.py index a40c0f3..a749e07 100644 --- a/software/source/server/tunnel.py +++ b/software/source/server/tunnel.py @@ -1,142 +1,30 @@ -import subprocess -import re +import ngrok import pyqrcode -import time from ..utils.print_markdown import print_markdown - def create_tunnel( - tunnel_method="ngrok", server_host="localhost", server_port=10001, qr=False, domain=None -): - print_markdown("Exposing server to the internet...") - - server_url = "" - if tunnel_method == "bore": - try: - output = subprocess.check_output("command -v bore", shell=True) - except subprocess.CalledProcessError: - print( - "The bore-cli command is not available. Please run 'cargo install bore-cli'." - ) - print("For more information, see https://github.com/ekzhang/bore") - exit(1) - - time.sleep(6) - # output = subprocess.check_output(f'bore local {server_port} --to bore.pub', shell=True) - process = subprocess.Popen( - f"bore local {server_port} --to bore.pub", - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - ) - - while True: - line = process.stdout.readline() - print(line) - if not line: - break - if "listening at bore.pub:" in line: - remote_port = re.search("bore.pub:([0-9]*)", line).group(1) - server_url = f"bore.pub:{remote_port}" - print_markdown( - f"Your server is being hosted at the following URL: bore.pub:{remote_port}" - ) - break - - elif tunnel_method == "localtunnel": - if subprocess.call("command -v lt", shell=True): - print("The 'lt' command is not available.") - print( - "Please ensure you have Node.js installed, then run 'npm install -g localtunnel'." - ) - print( - "For more information, see https://github.com/localtunnel/localtunnel" - ) - exit(1) - else: - process = subprocess.Popen( - f"npx localtunnel --port {server_port}", - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - ) + server_host="localhost", server_port=10001, qr=False, domain=None +): + """ + To use most of ngrok’s features, you’ll need an authtoken. To obtain one, sign up for free at ngrok.com and + retrieve it from the authtoken page in your ngrok dashboard. - found_url = False - url_pattern = re.compile(r"your url is: https://[a-zA-Z0-9.-]+") + https://dashboard.ngrok.com/get-started/your-authtoken - while True: - line = process.stdout.readline() - if not line: - break # Break out of the loop if no more output - match = url_pattern.search(line) - if match: - found_url = True - remote_url = match.group(0).replace("your url is: ", "") - server_url = remote_url - print( - f"\nYour server is being hosted at the following URL: {remote_url}" - ) - break # Exit the loop once the URL is found - - if not found_url: - print( - "Failed to extract the localtunnel URL. Please check localtunnel's output for details." - ) - - elif tunnel_method == "ngrok": - # Check if ngrok is installed - is_installed = ( - subprocess.check_output("command -v ngrok", shell=True).decode().strip() - ) - if not is_installed: - print("The ngrok command is not available.") - print( - "Please install ngrok using the instructions at https://ngrok.com/docs/getting-started/" - ) - exit(1) - - # If ngrok is installed, start it on the specified port - # process = subprocess.Popen(f'ngrok http {server_port} --log=stdout', shell=True, stdout=subprocess.PIPE) - - if domain: - domain = f"--domain={domain}" - else: - domain = "" - process = subprocess.Popen( - f"ngrok http {server_port} --scheme http,https {domain} --log=stdout", - shell=True, - stdout=subprocess.PIPE, - ) - - # Initially, no URL is found - found_url = False - # Regular expression to match the ngrok URL - url_pattern = re.compile(r"https://[a-zA-Z0-9-]+\.ngrok(-free)?\.app") + You can set it as `NGROK_AUTHTOKEN` in your environment variables + """ + print_markdown("Exposing server to the internet...") - # Read the output line by line - while True: - line = process.stdout.readline().decode("utf-8") - if not line: - break # Break out of the loop if no more output - match = url_pattern.search(line) - if match: - found_url = True - remote_url = match.group(0) - server_url = remote_url - print( - f"\nYour server is being hosted at the following URL: {remote_url}" - ) - break # Exit the loop once the URL is found + if domain: + listener = ngrok.forward(f"{server_host}:{server_port}", authtoken_from_env=True, domain=domain) + else: + listener = ngrok.forward(f"{server_host}:{server_port}", authtoken_from_env=True) - if not found_url: - print( - "Failed to extract the ngrok tunnel URL. Please check ngrok's output for details." - ) + listener_url = listener.url() - if server_url and qr: - text = pyqrcode.create(remote_url) + print(f"Ingress established at: {listener_url}"); + if listener_url and qr: + text = pyqrcode.create(listener_url) print(text.terminal(quiet_zone=1)) - return server_url + return listener_url diff --git a/software/start.py b/software/start.py index 7fd2258..3b54766 100644 --- a/software/start.py +++ b/software/start.py @@ -1,5 +1,5 @@ import typer -import asyncio +import ngrok import platform import threading import os @@ -7,11 +7,18 @@ import importlib from source.server.tunnel import create_tunnel from source.server.async_server import start_server import subprocess +from livekit import api +import socket +import json +import segno +import time +from dotenv import load_dotenv import signal app = typer.Typer() +load_dotenv() @app.command() def run( @@ -26,9 +33,6 @@ def run( "--server-port", help="Specify the server port where the server will deploy", ), - tunnel_service: str = typer.Option( - "ngrok", "--tunnel-service", help="Specify the tunnel service" - ), expose: bool = typer.Option(False, "--expose", help="Expose server to internet"), client: bool = typer.Option(False, "--client", help="Run client"), server_url: str = typer.Option( @@ -60,13 +64,14 @@ def run( "--debug", help="Print latency measurements and save microphone recordings locally for manual playback.", ), - + livekit: bool = typer.Option( + False, "--livekit", help="Creates QR code for livekit server and token." + ), ): _run( server=server, server_host=server_host, server_port=server_port, - tunnel_service=tunnel_service, expose=expose, client=client, server_url=server_url, @@ -76,6 +81,7 @@ def run( domain=domain, profiles=profiles, profile=profile, + livekit=livekit, ) @@ -83,7 +89,6 @@ def _run( server: bool = False, server_host: str = "0.0.0.0", server_port: int = 10001, - tunnel_service: str = "bore", expose: bool = False, client: bool = False, server_url: str = None, @@ -93,6 +98,7 @@ def _run( domain = None, profiles = None, profile = None, + livekit: bool = False, ): profiles_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "source", "server", "profiles") @@ -124,7 +130,7 @@ def _run( if not server_url: server_url = f"{server_host}:{server_port}" - if not server and not client: + if not server and not client and not livekit: server = True client = True @@ -154,9 +160,9 @@ def _run( ) server_thread.start() - if expose: + if expose and not livekit: tunnel_thread = threading.Thread( - target=create_tunnel, args=[tunnel_service, server_host, server_port, qr, domain] + target=create_tunnel, args=[server_host, server_port, qr, domain] ) tunnel_thread.start() @@ -191,6 +197,71 @@ def _run( client_thread = threading.Thread(target=module.main, args=[server_url, debug, play_audio]) client_thread.start() + if livekit: + def run_command(command): + subprocess.run(command, shell=True, check=True) + + # Create threads for each command and store handles + interpreter_thread = threading.Thread( + target=run_command, args=("poetry run interpreter --server",) + ) + livekit_thread = threading.Thread( + target=run_command, args=('livekit-server --dev --bind "0.0.0.0"',) + ) + worker_thread = threading.Thread( + target=run_command, args=("python worker.py dev",) + ) + + threads = [interpreter_thread, livekit_thread, worker_thread] + + # Start all threads and set up logging for thread completion + for thread in threads: + thread.start() + time.sleep(7) + + # Create QR code + if expose and domain: + listener = ngrok.forward("localhost:7880", authtoken_from_env=True, domain=domain) + url= listener.url() + print(url) + content = json.dumps({"livekit_server": url}) + elif expose and not domain: + listener = ngrok.forward("localhost:7880", authtoken_from_env=True) + url= listener.url() + print(url) + content = json.dumps({"livekit_server": url}) + else: + # Get local IP address + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + ip_address = s.getsockname()[0] + s.close() + + url = f"ws://{ip_address}:7880" + print(url) + content = json.dumps({"livekit_server": url}) + + qr_code = segno.make(content) + qr_code.terminal(compact=True) + + print("Mobile setup complete. Scan the QR code to connect.") + + def signal_handler(sig, frame): + print("Termination signal received. Shutting down...") + for thread in threads: + if thread.is_alive(): + # This will only work if the subprocess uses shell=True and the OS is Unix-like + subprocess.run(f"pkill -P {os.getpid()}", shell=True) + os._exit(0) + + # Register the signal handler + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Wait for all threads to complete + for thread in threads: + thread.join() + try: if server: server_thread.join() diff --git a/software/worker.py b/software/worker.py new file mode 100644 index 0000000..82b96c9 --- /dev/null +++ b/software/worker.py @@ -0,0 +1,76 @@ +import asyncio +import copy +import os + +from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli +from livekit.agents.llm import ChatContext, ChatMessage +from livekit import rtc +from livekit.agents.voice_assistant import VoiceAssistant +from livekit.plugins import deepgram, openai, silero, elevenlabs +from dotenv import load_dotenv + +load_dotenv() + + +# This function is the entrypoint for the agent. +async def entrypoint(ctx: JobContext): + # Create an initial chat context with a system prompt + initial_ctx = ChatContext().append( + role="system", + text=( + "You are a voice assistant created by LiveKit. Your interface with users will be voice. " + "You should use short and concise responses, and avoiding usage of unpronounceable punctuation." + ), + ) + + # Connect to the LiveKit room + await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) + + # VoiceAssistant is a class that creates a full conversational AI agent. + # See https://github.com/livekit/agents/blob/main/livekit-agents/livekit/agents/voice_assistant/assistant.py + # for details on how it works. + open_interpreter = openai.LLM( + model="open-interpreter", base_url="http://0.0.0.0:8000/openai" + ) + assistant = VoiceAssistant( + vad=silero.VAD.load(), # Voice Activity Detection + stt=deepgram.STT(), # Speech-to-Text + llm=open_interpreter, # Language Model + tts=elevenlabs.TTS(), # Text-to-Speech + chat_ctx=initial_ctx, # Chat history context + ) + + chat = rtc.ChatManager(ctx.room) + + async def _answer_from_text(text: str): + chat_ctx = copy.deepcopy(assistant._chat_ctx) + chat_ctx.messages.append(ChatMessage(role="user", content=text)) + + stream = open_interpreter.chat(chat_ctx=chat_ctx) + await assistant.say(stream) + + @chat.on("message_received") + def on_chat_received(msg: rtc.ChatMessage): + print("RECEIVED MESSAGE OMG!!!!!!!!!!") + print("RECEIVED MESSAGE OMG!!!!!!!!!!") + print("RECEIVED MESSAGE OMG!!!!!!!!!!") + print("RECEIVED MESSAGE OMG!!!!!!!!!!") + if not msg.message: + return + + asyncio.create_task(_answer_from_text(msg.message)) + + # Start the voice assistant with the LiveKit room + assistant.start(ctx.room) + + await asyncio.sleep(1) + + # Greets the user with an initial message + await assistant.say("Hey, how can I help you today?", allow_interruptions=True) + + +if __name__ == "__main__": + # Initialize the worker with the entrypoint + cli.run_app( + WorkerOptions(entrypoint_fnc=entrypoint, api_key="devkey", api_secret="secret", port=8082) + )