LiveKit Pipeline Agent (#4)
* init processors * fix image append for chat messages * add pre tts cb * stash non-functional worker changes * add complete flag parsing worker * stash worker changes * update worker.py to handle message context correctly * draft worker -- currently mismanages chat ctx * stash draft delta changes * working worker on push to talk happy path * final working worker on push to talk * refactor append image * updated video frame processing * rm text processor * working draft main * draft working poetrypull/314/head
parent
befddaf205
commit
1720b783ce
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,48 @@
|
||||
import aiohttp
|
||||
from typing import Annotated
|
||||
from livekit.agents import llm
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
# Define the path to the log file
|
||||
LOG_FILE_PATH = 'assistant_functions.txt'
|
||||
|
||||
def log_message(message: str):
|
||||
"""Append a message to the log file with a timestamp."""
|
||||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
with open(LOG_FILE_PATH, 'a') as log_file:
|
||||
log_file.write(f"{timestamp} - {message}\n")
|
||||
|
||||
# first define a class that inherits from llm.FunctionContext
|
||||
class AssistantFnc(llm.FunctionContext):
|
||||
# the llm.ai_callable decorator marks this function as a tool available to the LLM
|
||||
# by default, it'll use the docstring as the function's description
|
||||
@llm.ai_callable()
|
||||
async def get_weather(
|
||||
self,
|
||||
# by using the Annotated type, arg description and type are available to the LLM
|
||||
location: Annotated[
|
||||
str, llm.TypeInfo(description="The location to get the weather for")
|
||||
],
|
||||
) -> str:
|
||||
"""Called when the user asks about the weather. This function will return the weather for the given location."""
|
||||
log_message(f"getting weather for {location}")
|
||||
url = f"https://wttr.in/{location}?format=%C+%t"
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as response:
|
||||
log_message(f"response: {response}")
|
||||
if response.status == 200:
|
||||
weather_data = await response.text()
|
||||
|
||||
content: str = f"The weather in {location} is {weather_data}."
|
||||
log_message(f"content: {content}")
|
||||
|
||||
# response from the function call is returned to the LLM
|
||||
# as a tool response. The LLM's response will include this data
|
||||
return content
|
||||
|
||||
else:
|
||||
log_message(f"Failed to get weather data, status code: {response.status}")
|
||||
return f"Failed to get weather data, status code: {response.status}"
|
||||
|
@ -0,0 +1,14 @@
|
||||
from livekit.agents import stt, transcription
|
||||
|
||||
async def _forward_transcription(
|
||||
stt_stream: stt.SpeechStream,
|
||||
stt_forwarder: transcription.STTSegmentsForwarder,
|
||||
):
|
||||
"""Forward the transcription to the client and log the transcript in the console"""
|
||||
async for ev in stt_stream:
|
||||
stt_forwarder.update(ev)
|
||||
if ev.type == stt.SpeechEventType.INTERIM_TRANSCRIPT:
|
||||
print(ev.alternatives[0].text, end="")
|
||||
elif ev.type == stt.SpeechEventType.FINAL_TRANSCRIPT:
|
||||
print("\n")
|
||||
print(" -> ", ev.alternatives[0].text)
|
@ -0,0 +1,50 @@
|
||||
from livekit.rtc import VideoStream
|
||||
from livekit.agents import JobContext
|
||||
from datetime import datetime
|
||||
from livekit.agents.pipeline import VoicePipelineAgent
|
||||
|
||||
|
||||
from livekit.rtc import VideoFrame
|
||||
import asyncio
|
||||
|
||||
# Define the path to the log file
|
||||
LOG_FILE_PATH = 'video_processor.txt'
|
||||
|
||||
def log_message(message: str):
|
||||
"""Append a message to the log file with a timestamp."""
|
||||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
with open(LOG_FILE_PATH, 'a') as log_file:
|
||||
log_file.write(f"{timestamp} - {message}\n")
|
||||
|
||||
class RemoteVideoProcessor:
|
||||
"""Processes video frames from a remote participant's video stream."""
|
||||
|
||||
def __init__(self, video_stream: VideoStream, job_ctx: JobContext):
|
||||
self.video_stream = video_stream
|
||||
self.job_ctx = job_ctx
|
||||
self.current_frame = None # Store the latest VideoFrame
|
||||
self.lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def process_frames(self):
|
||||
log_message("Starting to process remote video frames.")
|
||||
async for frame_event in self.video_stream:
|
||||
try:
|
||||
video_frame = frame_event.frame
|
||||
timestamp = frame_event.timestamp_us
|
||||
rotation = frame_event.rotation
|
||||
|
||||
# Store the current frame safely
|
||||
log_message(f"Received frame: width={video_frame.width}, height={video_frame.height}, type={video_frame.type}")
|
||||
async with self.lock:
|
||||
self.current_frame = video_frame
|
||||
|
||||
except Exception as e:
|
||||
log_message(f"Error processing frame: {e}")
|
||||
|
||||
async def get_current_frame(self) -> VideoFrame | None:
|
||||
"""Retrieve the current VideoFrame."""
|
||||
log_message("called get current frame")
|
||||
async with self.lock:
|
||||
log_message("retrieving current frame: " + str(self.current_frame))
|
||||
return self.current_frame
|
Loading…
Reference in new issue