Merge pull request #293 from benxu3/livekit

add livekit command
pull/296/head
killian 5 months ago committed by GitHub
commit 26adb89c0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,142 +1,30 @@
import subprocess
import re
import ngrok
import pyqrcode
import time
from ..utils.print_markdown import print_markdown
def create_tunnel(
tunnel_method="ngrok", server_host="localhost", server_port=10001, qr=False, domain=None
server_host="localhost", server_port=10001, qr=False, domain=None
):
print_markdown("Exposing server to the internet...")
server_url = ""
if tunnel_method == "bore":
try:
output = subprocess.check_output("command -v bore", shell=True)
except subprocess.CalledProcessError:
print(
"The bore-cli command is not available. Please run 'cargo install bore-cli'."
)
print("For more information, see https://github.com/ekzhang/bore")
exit(1)
time.sleep(6)
# output = subprocess.check_output(f'bore local {server_port} --to bore.pub', shell=True)
process = subprocess.Popen(
f"bore local {server_port} --to bore.pub",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
while True:
line = process.stdout.readline()
print(line)
if not line:
break
if "listening at bore.pub:" in line:
remote_port = re.search("bore.pub:([0-9]*)", line).group(1)
server_url = f"bore.pub:{remote_port}"
print_markdown(
f"Your server is being hosted at the following URL: bore.pub:{remote_port}"
)
break
elif tunnel_method == "localtunnel":
if subprocess.call("command -v lt", shell=True):
print("The 'lt' command is not available.")
print(
"Please ensure you have Node.js installed, then run 'npm install -g localtunnel'."
)
print(
"For more information, see https://github.com/localtunnel/localtunnel"
)
exit(1)
else:
process = subprocess.Popen(
f"npx localtunnel --port {server_port}",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
"""
To use most of ngroks features, youll need an authtoken. To obtain one, sign up for free at ngrok.com and
retrieve it from the authtoken page in your ngrok dashboard.
found_url = False
url_pattern = re.compile(r"your url is: https://[a-zA-Z0-9.-]+")
https://dashboard.ngrok.com/get-started/your-authtoken
while True:
line = process.stdout.readline()
if not line:
break # Break out of the loop if no more output
match = url_pattern.search(line)
if match:
found_url = True
remote_url = match.group(0).replace("your url is: ", "")
server_url = remote_url
print(
f"\nYour server is being hosted at the following URL: {remote_url}"
)
break # Exit the loop once the URL is found
if not found_url:
print(
"Failed to extract the localtunnel URL. Please check localtunnel's output for details."
)
elif tunnel_method == "ngrok":
# Check if ngrok is installed
is_installed = (
subprocess.check_output("command -v ngrok", shell=True).decode().strip()
)
if not is_installed:
print("The ngrok command is not available.")
print(
"Please install ngrok using the instructions at https://ngrok.com/docs/getting-started/"
)
exit(1)
# If ngrok is installed, start it on the specified port
# process = subprocess.Popen(f'ngrok http {server_port} --log=stdout', shell=True, stdout=subprocess.PIPE)
You can set it as `NGROK_AUTHTOKEN` in your environment variables
"""
print_markdown("Exposing server to the internet...")
if domain:
domain = f"--domain={domain}"
listener = ngrok.forward(f"{server_host}:{server_port}", authtoken_from_env=True, domain=domain)
else:
domain = ""
process = subprocess.Popen(
f"ngrok http {server_port} --scheme http,https {domain} --log=stdout",
shell=True,
stdout=subprocess.PIPE,
)
# Initially, no URL is found
found_url = False
# Regular expression to match the ngrok URL
url_pattern = re.compile(r"https://[a-zA-Z0-9-]+\.ngrok(-free)?\.app")
# Read the output line by line
while True:
line = process.stdout.readline().decode("utf-8")
if not line:
break # Break out of the loop if no more output
match = url_pattern.search(line)
if match:
found_url = True
remote_url = match.group(0)
server_url = remote_url
print(
f"\nYour server is being hosted at the following URL: {remote_url}"
)
break # Exit the loop once the URL is found
listener = ngrok.forward(f"{server_host}:{server_port}", authtoken_from_env=True)
if not found_url:
print(
"Failed to extract the ngrok tunnel URL. Please check ngrok's output for details."
)
listener_url = listener.url()
if server_url and qr:
text = pyqrcode.create(remote_url)
print(f"Ingress established at: {listener_url}");
if listener_url and qr:
text = pyqrcode.create(listener_url)
print(text.terminal(quiet_zone=1))
return server_url
return listener_url

@ -1,5 +1,5 @@
import typer
import asyncio
import ngrok
import platform
import threading
import os
@ -7,11 +7,18 @@ import importlib
from source.server.tunnel import create_tunnel
from source.server.async_server import start_server
import subprocess
from livekit import api
import socket
import json
import segno
import time
from dotenv import load_dotenv
import signal
app = typer.Typer()
load_dotenv()
@app.command()
def run(
@ -26,9 +33,6 @@ def run(
"--server-port",
help="Specify the server port where the server will deploy",
),
tunnel_service: str = typer.Option(
"ngrok", "--tunnel-service", help="Specify the tunnel service"
),
expose: bool = typer.Option(False, "--expose", help="Expose server to internet"),
client: bool = typer.Option(False, "--client", help="Run client"),
server_url: str = typer.Option(
@ -60,13 +64,14 @@ def run(
"--debug",
help="Print latency measurements and save microphone recordings locally for manual playback.",
),
livekit: bool = typer.Option(
False, "--livekit", help="Creates QR code for livekit server and token."
),
):
_run(
server=server,
server_host=server_host,
server_port=server_port,
tunnel_service=tunnel_service,
expose=expose,
client=client,
server_url=server_url,
@ -76,6 +81,7 @@ def run(
domain=domain,
profiles=profiles,
profile=profile,
livekit=livekit,
)
@ -83,7 +89,6 @@ def _run(
server: bool = False,
server_host: str = "0.0.0.0",
server_port: int = 10001,
tunnel_service: str = "bore",
expose: bool = False,
client: bool = False,
server_url: str = None,
@ -93,6 +98,7 @@ def _run(
domain = None,
profiles = None,
profile = None,
livekit: bool = False,
):
profiles_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "source", "server", "profiles")
@ -124,7 +130,7 @@ def _run(
if not server_url:
server_url = f"{server_host}:{server_port}"
if not server and not client:
if not server and not client and not livekit:
server = True
client = True
@ -154,9 +160,9 @@ def _run(
)
server_thread.start()
if expose:
if expose and not livekit:
tunnel_thread = threading.Thread(
target=create_tunnel, args=[tunnel_service, server_host, server_port, qr, domain]
target=create_tunnel, args=[server_host, server_port, qr, domain]
)
tunnel_thread.start()
@ -191,6 +197,71 @@ def _run(
client_thread = threading.Thread(target=module.main, args=[server_url, debug, play_audio])
client_thread.start()
if livekit:
def run_command(command):
subprocess.run(command, shell=True, check=True)
# Create threads for each command and store handles
interpreter_thread = threading.Thread(
target=run_command, args=("poetry run interpreter --server",)
)
livekit_thread = threading.Thread(
target=run_command, args=('livekit-server --dev --bind "0.0.0.0"',)
)
worker_thread = threading.Thread(
target=run_command, args=("python worker.py dev",)
)
threads = [interpreter_thread, livekit_thread, worker_thread]
# Start all threads and set up logging for thread completion
for thread in threads:
thread.start()
time.sleep(7)
# Create QR code
if expose and domain:
listener = ngrok.forward("localhost:7880", authtoken_from_env=True, domain=domain)
url= listener.url()
print(url)
content = json.dumps({"livekit_server": url})
elif expose and not domain:
listener = ngrok.forward("localhost:7880", authtoken_from_env=True)
url= listener.url()
print(url)
content = json.dumps({"livekit_server": url})
else:
# Get local IP address
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip_address = s.getsockname()[0]
s.close()
url = f"ws://{ip_address}:7880"
print(url)
content = json.dumps({"livekit_server": url})
qr_code = segno.make(content)
qr_code.terminal(compact=True)
print("Mobile setup complete. Scan the QR code to connect.")
def signal_handler(sig, frame):
print("Termination signal received. Shutting down...")
for thread in threads:
if thread.is_alive():
# This will only work if the subprocess uses shell=True and the OS is Unix-like
subprocess.run(f"pkill -P {os.getpid()}", shell=True)
os._exit(0)
# Register the signal handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Wait for all threads to complete
for thread in threads:
thread.join()
try:
if server:
server_thread.join()

@ -0,0 +1,76 @@
import asyncio
import copy
import os
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli
from livekit.agents.llm import ChatContext, ChatMessage
from livekit import rtc
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero, elevenlabs
from dotenv import load_dotenv
load_dotenv()
# This function is the entrypoint for the agent.
async def entrypoint(ctx: JobContext):
# Create an initial chat context with a system prompt
initial_ctx = ChatContext().append(
role="system",
text=(
"You are a voice assistant created by LiveKit. Your interface with users will be voice. "
"You should use short and concise responses, and avoiding usage of unpronounceable punctuation."
),
)
# Connect to the LiveKit room
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# VoiceAssistant is a class that creates a full conversational AI agent.
# See https://github.com/livekit/agents/blob/main/livekit-agents/livekit/agents/voice_assistant/assistant.py
# for details on how it works.
open_interpreter = openai.LLM(
model="open-interpreter", base_url="http://0.0.0.0:8000/openai"
)
assistant = VoiceAssistant(
vad=silero.VAD.load(), # Voice Activity Detection
stt=deepgram.STT(), # Speech-to-Text
llm=open_interpreter, # Language Model
tts=elevenlabs.TTS(), # Text-to-Speech
chat_ctx=initial_ctx, # Chat history context
)
chat = rtc.ChatManager(ctx.room)
async def _answer_from_text(text: str):
chat_ctx = copy.deepcopy(assistant._chat_ctx)
chat_ctx.messages.append(ChatMessage(role="user", content=text))
stream = open_interpreter.chat(chat_ctx=chat_ctx)
await assistant.say(stream)
@chat.on("message_received")
def on_chat_received(msg: rtc.ChatMessage):
print("RECEIVED MESSAGE OMG!!!!!!!!!!")
print("RECEIVED MESSAGE OMG!!!!!!!!!!")
print("RECEIVED MESSAGE OMG!!!!!!!!!!")
print("RECEIVED MESSAGE OMG!!!!!!!!!!")
if not msg.message:
return
asyncio.create_task(_answer_from_text(msg.message))
# Start the voice assistant with the LiveKit room
assistant.start(ctx.room)
await asyncio.sleep(1)
# Greets the user with an initial message
await assistant.say("Hey, how can I help you today?", allow_interruptions=True)
if __name__ == "__main__":
# Initialize the worker with the entrypoint
cli.run_app(
WorkerOptions(entrypoint_fnc=entrypoint, api_key="devkey", api_secret="secret", port=8082)
)
Loading…
Cancel
Save