diff --git a/software/start.py b/software/start.py index 7fd2258..1a3c105 100644 --- a/software/start.py +++ b/software/start.py @@ -7,6 +7,11 @@ import importlib from source.server.tunnel import create_tunnel from source.server.async_server import start_server import subprocess +from livekit import api +import socket +import json +import segno +import time import signal @@ -60,7 +65,9 @@ def run( "--debug", help="Print latency measurements and save microphone recordings locally for manual playback.", ), - + livekit: bool = typer.Option( + False, "--livekit", help="Creates QR code for livekit server and token." + ), ): _run( server=server, @@ -76,6 +83,7 @@ def run( domain=domain, profiles=profiles, profile=profile, + livekit=livekit, ) @@ -93,6 +101,7 @@ def _run( domain = None, profiles = None, profile = None, + livekit: bool = False, ): profiles_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "source", "server", "profiles") @@ -124,7 +133,7 @@ def _run( if not server_url: server_url = f"{server_host}:{server_port}" - if not server and not client: + if not server and not client and not livekit: server = True client = True @@ -191,6 +200,73 @@ def _run( client_thread = threading.Thread(target=module.main, args=[server_url, debug, play_audio]) client_thread.start() + if livekit: + def run_command(command): + subprocess.run(command, shell=True, check=True) + + def getToken(): + token = ( + api.AccessToken("devkey", "secret") + .with_identity("identity") + .with_name("my name") + .with_grants( + api.VideoGrants( + room_join=True, + room="my-room", + ) + ) + ) + return token.to_jwt() + + # Get local IP address + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + ip_address = s.getsockname()[0] + s.close() + + # Create threads for each command and store handles + interpreter_thread = threading.Thread( + target=run_command, args=("poetry run interpreter --server",) + ) + livekit_thread = threading.Thread( + target=run_command, args=('livekit-server --dev --bind "0.0.0.0"',) + ) + worker_thread = threading.Thread( + target=run_command, args=("python worker.py dev",) + ) + + threads = [interpreter_thread, livekit_thread, worker_thread] + + # Start all threads and set up logging for thread completion + for thread in threads: + thread.start() + time.sleep(5) + + # Create QR code + url = f"ws://{ip_address}:7880" + token = getToken() + content = json.dumps({"livekit_server": url, "token": token}) + qr_code = segno.make(content) + qr_code.terminal(compact=True) + + print("Mobile setup complete. Scan the QR code to connect.") + + def signal_handler(sig, frame): + print("Termination signal received. Shutting down...") + for thread in threads: + if thread.is_alive(): + # This will only work if the subprocess uses shell=True and the OS is Unix-like + subprocess.run(f"pkill -P {os.getpid()}", shell=True) + os._exit(0) + + # Register the signal handler + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Wait for all threads to complete + for thread in threads: + thread.join() + try: if server: server_thread.join() diff --git a/software/worker.py b/software/worker.py new file mode 100644 index 0000000..12b19ce --- /dev/null +++ b/software/worker.py @@ -0,0 +1,76 @@ +import asyncio +import copy +import os + +from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli +from livekit.agents.llm import ChatContext, ChatMessage +from livekit import rtc +from livekit.agents.voice_assistant import VoiceAssistant +from livekit.plugins import deepgram, openai, silero, elevenlabs +from dotenv import load_dotenv + +load_dotenv() + + +# This function is the entrypoint for the agent. +async def entrypoint(ctx: JobContext): + # Create an initial chat context with a system prompt + initial_ctx = ChatContext().append( + role="system", + text=( + "You are a voice assistant created by LiveKit. Your interface with users will be voice. " + "You should use short and concise responses, and avoiding usage of unpronounceable punctuation." + ), + ) + + # Connect to the LiveKit room + await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) + + # VoiceAssistant is a class that creates a full conversational AI agent. + # See https://github.com/livekit/agents/blob/main/livekit-agents/livekit/agents/voice_assistant/assistant.py + # for details on how it works. + open_interpreter = openai.LLM( + model="open-interpreter", base_url="http://0.0.0.0:8000/openai" + ) + assistant = VoiceAssistant( + vad=silero.VAD.load(), # Voice Activity Detection + stt=deepgram.STT(), # Speech-to-Text + llm=open_interpreter, # Language Model + tts=elevenlabs.TTS(), # Text-to-Speech + chat_ctx=initial_ctx, # Chat history context + ) + + chat = rtc.ChatManager(ctx.room) + + async def _answer_from_text(text: str): + chat_ctx = copy.deepcopy(assistant._chat_ctx) + chat_ctx.messages.append(ChatMessage(role="user", content=text)) + + stream = open_interpreter.chat(chat_ctx=chat_ctx) + await assistant.say(stream) + + @chat.on("message_received") + def on_chat_received(msg: rtc.ChatMessage): + print("RECEIVED MESSAGE OMG!!!!!!!!!!") + print("RECEIVED MESSAGE OMG!!!!!!!!!!") + print("RECEIVED MESSAGE OMG!!!!!!!!!!") + print("RECEIVED MESSAGE OMG!!!!!!!!!!") + if not msg.message: + return + + asyncio.create_task(_answer_from_text(msg.message)) + + # Start the voice assistant with the LiveKit room + assistant.start(ctx.room) + + await asyncio.sleep(1) + + # Greets the user with an initial message + await assistant.say("Hey, how can I help you today?", allow_interruptions=True) + + +if __name__ == "__main__": + # Initialize the worker with the entrypoint + cli.run_app( + WorkerOptions(entrypoint_fnc=entrypoint, api_key="devkey", api_secret="secret") + )