Change worker to be a python file

pull/300/head
killian 4 months ago
parent d94b18d59b
commit bb53f9f81f

@ -1,16 +1,3 @@
"""
01 # Runs light server and light simulator
01 --server livekit # Runs livekit server only
01 --server light # Runs light server only
01 --client light-python
... --expose # Exposes the server with ngrok
... --expose --domain <domain> # Exposes the server on a specific ngrok domain
... --qr # Displays a qr code
"""
from yaspin import yaspin
spinner = yaspin()
spinner.start()
@ -29,6 +16,9 @@ import segno
import time
from dotenv import load_dotenv
import signal
from source.server.livekit.worker import main as worker_main
import warnings
import requests
load_dotenv()
@ -162,24 +152,13 @@ def run(
# Start the livekit server
livekit_thread = threading.Thread(
target=run_command, args=(f'livekit-server --dev --bind "{server_host}" --port {server_port}',)
target=run_command, args=(f'livekit-server --dev --bind "{server_host}" --port {server_port} > /dev/null 2>&1',)
)
time.sleep(7)
livekit_thread.start()
threads.append(livekit_thread)
# We communicate with the livekit worker via environment variables:
os.environ["INTERPRETER_SERVER_HOST"] = server_host
os.environ["INTERPRETER_LIGHT_SERVER_PORT"] = str(light_server_port)
os.environ["LIVEKIT_URL"] = f"ws://{server_host}:{server_port}"
# Start the livekit worker
worker_thread = threading.Thread(
target=run_command, args=("python source/server/livekit/worker.py dev",) # TODO: This should not be a CLI, it should just run the python file
)
time.sleep(7)
worker_thread.start()
threads.append(worker_thread)
livekit_url = f"ws://{server_host}:{server_port}"
if expose:
@ -200,7 +179,6 @@ def run(
if server == "livekit":
print("Livekit server will run at:", url)
### DISPLAY QR CODE
if qr:
@ -241,6 +219,28 @@ def run(
signal.signal(signal.SIGTERM, signal_handler)
try:
# Verify the server is running
for attempt in range(10):
try:
response = requests.get(url)
status = "OK" if response.status_code == 200 else "Not OK"
if status == "OK":
break
except requests.RequestException:
pass
time.sleep(1)
else:
raise Exception(f"Server at {url} failed to respond after 10 attempts")
# Start the livekit worker
if server == "livekit":
time.sleep(7)
# These are needed to communicate with the worker's entrypoint
os.environ['INTERPRETER_SERVER_HOST'] = light_server_host
os.environ['INTERPRETER_SERVER_PORT'] = str(light_server_port)
worker_main(livekit_url)
# Wait for all threads to complete
for thread in threads:
thread.join()

@ -7,9 +7,15 @@ from livekit import rtc
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import deepgram, openai, silero, elevenlabs
from dotenv import load_dotenv
import sys
import numpy as np
load_dotenv()
start_message = """Hi! You can hold the white circle below to speak to me.
Try asking what I can do."""
# This function is the entrypoint for the agent.
async def entrypoint(ctx: JobContext):
# Create an initial chat context with a system prompt
@ -23,17 +29,47 @@ async def entrypoint(ctx: JobContext):
# Connect to the LiveKit room
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# Create a black background with a white circle
width, height = 640, 480
image_np = np.zeros((height, width, 4), dtype=np.uint8)
# Create a white circle
center = (width // 2, height // 2)
radius = 50
y, x = np.ogrid[:height, :width]
mask = ((x - center[0])**2 + (y - center[1])**2) <= radius**2
image_np[mask] = [255, 255, 255, 255] # White color with full opacity
source = rtc.VideoSource(width, height)
track = rtc.LocalVideoTrack.create_video_track("static_image", source)
options = rtc.TrackPublishOptions()
options.source = rtc.TrackSource.SOURCE_CAMERA
publication = await ctx.room.local_participant.publish_track(track, options)
# Function to continuously publish the static image
async def publish_static_image():
while True:
frame = rtc.VideoFrame(width, height, rtc.VideoBufferType.RGBA, image_np.tobytes())
source.capture_frame(frame)
await asyncio.sleep(1/30) # Publish at 30 fps
# Start publishing the static image
asyncio.create_task(publish_static_image())
# VoiceAssistant is a class that creates a full conversational AI agent.
# See https://github.com/livekit/agents/blob/main/livekit-agents/livekit/agents/voice_assistant/assistant.py
# for details on how it works.
interpreter_server_host = os.getenv('INTERPRETER_SERVER_HOST', '0.0.0.0')
interpreter_server_port = os.getenv('INTERPRETER_LIGHT_SERVER_PORT', '8000')
interpreter_server_host = os.getenv('INTERPRETER_SERVER_HOST', 'localhost')
interpreter_server_port = os.getenv('INTERPRETER_SERVER_PORT', '8000')
base_url = f"http://{interpreter_server_host}:{interpreter_server_port}/openai"
# For debugging
# base_url = "http://127.0.0.1:8000/openai"
open_interpreter = openai.LLM(
model="open-interpreter", base_url=base_url
model="open-interpreter", base_url=base_url, api_key="x"
)
assistant = VoiceAssistant(
@ -65,13 +101,20 @@ async def entrypoint(ctx: JobContext):
await asyncio.sleep(1)
# Greets the user with an initial message
await assistant.say("""Hi! You can hold the white circle below to speak to me.
await assistant.say(start_message,
allow_interruptions=True)
Try asking what I can do.""", allow_interruptions=True)
def main(livekit_url):
# Workers have to be run as CLIs right now.
# So we need to simualte running "[this file] dev"
# Modify sys.argv to set the path to this file as the first argument
# and 'dev' as the second argument
sys.argv = [str(__file__), 'dev']
if __name__ == "__main__":
# Initialize the worker with the entrypoint
cli.run_app(
WorkerOptions(entrypoint_fnc=entrypoint, api_key="devkey", api_secret="secret", ws_url=os.getenv("LIVEKIT_URL"))
)
WorkerOptions(entrypoint_fnc=entrypoint, api_key="devkey", api_secret="secret", ws_url=livekit_url)
)
Loading…
Cancel
Save