import asyncio import copy import os from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli from livekit.agents.llm import ChatContext, ChatMessage from livekit import rtc from livekit.agents.voice_assistant import VoiceAssistant from livekit.plugins import deepgram, openai, silero, elevenlabs, cartesia from dotenv import load_dotenv import sys import numpy as np load_dotenv() start_message = """Hi! You can hold the white circle below to speak to me. Try asking what I can do.""" # This function is the entrypoint for the agent. async def entrypoint(ctx: JobContext): # Create an initial chat context with a system prompt initial_ctx = ChatContext().append( role="system", text=( "" # Open Interpreter handles this. ), ) # Connect to the LiveKit room await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) # Create a black background with a white circle width, height = 640, 480 image_np = np.zeros((height, width, 4), dtype=np.uint8) # Create a white circle center = (width // 2, height // 2) radius = 50 y, x = np.ogrid[:height, :width] mask = ((x - center[0])**2 + (y - center[1])**2) <= radius**2 image_np[mask] = [255, 255, 255, 255] # White color with full opacity source = rtc.VideoSource(width, height) track = rtc.LocalVideoTrack.create_video_track("static_image", source) options = rtc.TrackPublishOptions() options.source = rtc.TrackSource.SOURCE_CAMERA publication = await ctx.room.local_participant.publish_track(track, options) # Function to continuously publish the static image async def publish_static_image(): while True: frame = rtc.VideoFrame(width, height, rtc.VideoBufferType.RGBA, image_np.tobytes()) source.capture_frame(frame) await asyncio.sleep(1/30) # Publish at 30 fps # Start publishing the static image asyncio.create_task(publish_static_image()) # VoiceAssistant is a class that creates a full conversational AI agent. # See https://github.com/livekit/agents/blob/main/livekit-agents/livekit/agents/voice_assistant/assistant.py # for details on how it works. interpreter_server_host = os.getenv('INTERPRETER_SERVER_HOST', 'localhost') interpreter_server_port = os.getenv('INTERPRETER_SERVER_PORT', '8000') base_url = f"http://{interpreter_server_host}:{interpreter_server_port}/openai" # For debugging # base_url = "http://127.0.0.1:8000/openai" open_interpreter = openai.LLM( model="open-interpreter", base_url=base_url, api_key="x" ) tts_provider = os.getenv('01_TTS', '').lower() stt_provider = os.getenv('01_STT', '').lower() # Add plugins here if tts_provider == 'openai': tts = openai.TTS() elif tts_provider == 'elevenlabs': tts = elevenlabs.TTS() elif tts_provider == 'cartesia': tts = cartesia.TTS() else: raise ValueError(f"Unsupported TTS provider: {tts_provider}. Please set 01_TTS environment variable to 'openai' or 'elevenlabs'.") if stt_provider == 'deepgram': stt = deepgram.STT() else: raise ValueError(f"Unsupported STT provider: {stt_provider}. Please set 01_STT environment variable to 'deepgram'.") assistant = VoiceAssistant( vad=silero.VAD.load(), # Voice Activity Detection stt=stt, # Speech-to-Text llm=open_interpreter, # Language Model tts=tts, # Text-to-Speech chat_ctx=initial_ctx, # Chat history context ) chat = rtc.ChatManager(ctx.room) async def _answer_from_text(text: str): chat_ctx = copy.deepcopy(assistant._chat_ctx) chat_ctx.messages.append(ChatMessage(role="user", content=text)) stream = open_interpreter.chat(chat_ctx=chat_ctx) await assistant.say(stream) @chat.on("message_received") def on_chat_received(msg: rtc.ChatMessage): if not msg.message: return asyncio.create_task(_answer_from_text(msg.message)) # Start the voice assistant with the LiveKit room assistant.start(ctx.room) await asyncio.sleep(1) # Greets the user with an initial message await assistant.say(start_message, allow_interruptions=True) def main(livekit_url): # Workers have to be run as CLIs right now. # So we need to simualte running "[this file] dev" # Modify sys.argv to set the path to this file as the first argument # and 'dev' as the second argument sys.argv = [str(__file__), 'dev'] # Initialize the worker with the entrypoint cli.run_app( WorkerOptions(entrypoint_fnc=entrypoint, api_key="devkey", api_secret="secret", ws_url=livekit_url) )