You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
01/software/source/server/livekit/worker.py

121 lines
4.1 KiB

import asyncio
import copy
import os
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli
from livekit.agents.llm import ChatContext, ChatMessage
from livekit import rtc
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero, elevenlabs
from dotenv import load_dotenv
import sys
import numpy as np
load_dotenv()
start_message = """Hi! You can hold the white circle below to speak to me.
Try asking what I can do."""
# This function is the entrypoint for the agent.
async def entrypoint(ctx: JobContext):
# Create an initial chat context with a system prompt
initial_ctx = ChatContext().append(
role="system",
text=(
"" # Open Interpreter handles this.
),
)
# Connect to the LiveKit room
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# Create a black background with a white circle
width, height = 640, 480
image_np = np.zeros((height, width, 4), dtype=np.uint8)
# Create a white circle
center = (width // 2, height // 2)
radius = 50
y, x = np.ogrid[:height, :width]
mask = ((x - center[0])**2 + (y - center[1])**2) <= radius**2
image_np[mask] = [255, 255, 255, 255] # White color with full opacity
source = rtc.VideoSource(width, height)
track = rtc.LocalVideoTrack.create_video_track("static_image", source)
options = rtc.TrackPublishOptions()
options.source = rtc.TrackSource.SOURCE_CAMERA
publication = await ctx.room.local_participant.publish_track(track, options)
# Function to continuously publish the static image
async def publish_static_image():
while True:
frame = rtc.VideoFrame(width, height, rtc.VideoBufferType.RGBA, image_np.tobytes())
source.capture_frame(frame)
await asyncio.sleep(1/30) # Publish at 30 fps
# Start publishing the static image
asyncio.create_task(publish_static_image())
# VoiceAssistant is a class that creates a full conversational AI agent.
# See https://github.com/livekit/agents/blob/main/livekit-agents/livekit/agents/voice_assistant/assistant.py
# for details on how it works.
interpreter_server_host = os.getenv('INTERPRETER_SERVER_HOST', 'localhost')
interpreter_server_port = os.getenv('INTERPRETER_SERVER_PORT', '8000')
base_url = f"http://{interpreter_server_host}:{interpreter_server_port}/openai"
# For debugging
# base_url = "http://127.0.0.1:8000/openai"
open_interpreter = openai.LLM(
model="open-interpreter", base_url=base_url, api_key="x"
)
assistant = VoiceAssistant(
vad=silero.VAD.load(), # Voice Activity Detection
stt=deepgram.STT(), # Speech-to-Text
llm=open_interpreter, # Language Model
#tts=elevenlabs.TTS(), # Text-to-Speech
tts=openai.TTS(), # Text-to-Speech
chat_ctx=initial_ctx, # Chat history context
)
chat = rtc.ChatManager(ctx.room)
async def _answer_from_text(text: str):
chat_ctx = copy.deepcopy(assistant._chat_ctx)
chat_ctx.messages.append(ChatMessage(role="user", content=text))
stream = open_interpreter.chat(chat_ctx=chat_ctx)
await assistant.say(stream)
@chat.on("message_received")
def on_chat_received(msg: rtc.ChatMessage):
if not msg.message:
return
asyncio.create_task(_answer_from_text(msg.message))
# Start the voice assistant with the LiveKit room
assistant.start(ctx.room)
await asyncio.sleep(1)
# Greets the user with an initial message
await assistant.say(start_message,
allow_interruptions=True)
def main(livekit_url):
# Workers have to be run as CLIs right now.
# So we need to simualte running "[this file] dev"
# Modify sys.argv to set the path to this file as the first argument
# and 'dev' as the second argument
sys.argv = [str(__file__), 'dev']
# Initialize the worker with the entrypoint
cli.run_app(
WorkerOptions(entrypoint_fnc=entrypoint, api_key="devkey", api_secret="secret", ws_url=livekit_url)
)