@ -24,9 +24,12 @@ load_dotenv()
# Define the path to the log file
LOG_FILE_PATH = ' worker.txt '
DEBUG = os . getenv ( ' DEBUG ' , ' false ' ) . lower ( ) == ' true '
def log_message ( message : str ) :
""" Append a message to the log file with a timestamp. """
if not DEBUG :
return
timestamp = datetime . now ( ) . strftime ( ' % Y- % m- %d % H: % M: % S ' )
with open ( LOG_FILE_PATH , ' a ' ) as log_file :
log_file . write ( f " { timestamp } - { message } \n " )
@ -101,8 +104,6 @@ async def entrypoint(ctx: JobContext):
tts_provider = os . getenv ( ' 01_TTS ' , ' ' ) . lower ( )
stt_provider = os . getenv ( ' 01_STT ' , ' ' ) . lower ( )
tts_provider = ' elevenlabs '
stt_provider = ' deepgram '
# Add plugins here
if tts_provider == ' openai ' :
@ -170,6 +171,8 @@ async def entrypoint(ctx: JobContext):
else :
async def process_query ( ) :
log_message ( f " [before_llm_cb] processing query in VAD with chat_ctx: { chat_ctx } " )
if remote_video_processor and not video_muted :
video_frame = await remote_video_processor . get_current_frame ( )
if video_frame :
@ -185,7 +188,7 @@ async def entrypoint(ctx: JobContext):
return process_query ( )
############################################################
# on_message_received implementation
# on_message_received helper
############################################################
async def _on_message_received ( msg : str ) :
nonlocal push_to_talk
@ -318,9 +321,39 @@ async def entrypoint(ctx: JobContext):
video_muted = False
log_message ( f " Track unmuted: { publication . kind } " )
############################################################
# on data received callback
############################################################
async def _publish_clear_chat ( ) :
local_participant = ctx . room . local_participant
await local_participant . publish_data ( payload = " {CLEAR_CHAT} " , topic = " chat_context " )
log_message ( " sent {CLEAR_CHAT} to chat_context for client to clear " )
await assistant . say ( assistant . start_message )
@ctx.room.on ( " data_received " )
def on_data_received ( data : rtc . DataPacket ) :
decoded_data = data . data . decode ( )
log_message ( f " received data from { data . topic } : { decoded_data } " )
if data . topic == " chat_context " and decoded_data == " {CLEAR_CHAT} " :
assistant . chat_ctx . messages . clear ( )
assistant . chat_ctx . append (
role = " system " ,
text = (
" Only take into context the user ' s image if their message is relevant or pertaining to the image. Otherwise just keep in context that the image is present but do not acknowledge or mention it in your response. "
) ,
)
log_message ( f " cleared chat_ctx " )
log_message ( f " chat_ctx is now { assistant . chat_ctx } " )
asyncio . create_task ( _publish_clear_chat ( ) )
############################################################
# Start the voice assistant with the LiveKit room
############################################################
assistant = VoicePipelineAgent (
vad = silero . VAD . load ( ) ,
stt = stt ,
@ -336,6 +369,21 @@ async def entrypoint(ctx: JobContext):
# Greets the user with an initial message
await assistant . say ( start_message , allow_interruptions = True )
############################################################
# wait for the voice assistant to finish
############################################################
@assistant.on ( " agent_started_speaking " )
def on_agent_started_speaking ( ) :
asyncio . create_task ( ctx . room . local_participant . publish_data ( payload = " {AGENT_STARTED_SPEAKING} " , topic = " agent_state " ) )
log_message ( " Agent started speaking " )
return
@assistant.on ( " agent_stopped_speaking " )
def on_agent_stopped_speaking ( ) :
asyncio . create_task ( ctx . room . local_participant . publish_data ( payload = " {AGENT_STOPPED_SPEAKING} " , topic = " agent_state " ) )
log_message ( " Agent stopped speaking " )
return
def main ( livekit_url : str ) :
# Workers have to be run as CLIs right now.