You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

303 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import asyncio
import threading
import time
import socket
import queue # Import the standard threading Queue
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.core.storage.chat_store import SimpleChatStore
from llama_index.core.memory import ChatMemoryBuffer
import TTS.tts.utils.text.cleaners as cleaners
import re
# Import necessary modules for STT, LLM, and TTS
from RealtimeSTT import AudioToTextRecorder
from RealtimeTTS import TextToAudioStream, CoquiEngine # You can use another TTS engine if preferred
from llama_index.llms.ollama import Ollama
# Settings for audio socket
AUDIO_SERVER_IP = '0.0.0.0'
AUDIO_SERVER_PORT = 65432
DATA_SERVER_PORT = 8012
# Global variables
recorder = None
prev_text = ""
last_text_change_time = time.time()
text_stable_duration = 1 # Time duration without text changes to trigger LLM
audio_clients = [] # List of connected audio clients
audio_clients_lock = threading.Lock()
is_llm_processing = False
is_interrupted = False # New variable to track interruption
llm_tts_task = None # Task for LLM and TTS processing
loop = None # Event loop
# Function to process detected text
def text_detected(text):
global prev_text, last_text_change_time, is_llm_processing, is_interrupted, llm_tts_task
text = text.strip()
if text != prev_text:
prev_text = text
last_text_change_time = time.time()
print(f"Realtime text: {text}")
if is_llm_processing:
is_interrupted = True
if llm_tts_task and not llm_tts_task.done():
llm_tts_task.cancel()
tts_stream.stop()
print("LLM and TTS have been interrupted due to new user input.")
async def handle_llm_and_tts(prompt):
global is_llm_processing, is_interrupted, llm_tts_task
is_llm_processing = True
is_interrupted = False
print(f"Sending to LLM: {prompt}")
q = queue.Queue()
def llm_streaming():
response = chat.stream_chat(prompt)
for completion in response.response_gen:
if is_interrupted:
print("\nLLM generation interrupted.")
break
completion = cleaners.replace_symbols(completion, lang=None)
completion = cleaners.remove_aux_symbols(completion)
completion = re.sub(r"[\*]+", "", completion)
completion = re.sub(r'[^a-zA-Zа-яА-ЯёЁ0-9\s.,!?;:\'\"\*-]', '', completion)
completion = re.sub(r'\s+', ' ', completion)
# Put completion into the queue
q.put(completion)
print(completion, end='', flush=True)
chat_store.persist(persist_path="~/chat/chat_store.json")
# Signal that LLM streaming is done
q.put(None)
# Start llm_streaming in a separate thread
threading.Thread(target=llm_streaming, daemon=True).start()
def text_stream():
while True:
if is_interrupted:
break
try:
delta = q.get(timeout=0.1)
if delta is None:
break
yield delta
except queue.Empty:
continue
tts_stream.feed(text_stream())
try:
await play_and_send_audio()
except asyncio.CancelledError:
print("LLM and TTS task was cancelled.")
is_llm_processing = False
async def play_and_send_audio():
global is_interrupted
def on_audio_chunk(chunk):
if is_interrupted:
return
with audio_clients_lock:
for client_socket in audio_clients:
try:
client_socket.sendall(chunk)
except Exception as e:
print(f"Error sending audio to client: {e}")
audio_clients.remove(client_socket)
tts_stream.play(on_audio_chunk=on_audio_chunk, muted=True)
# Function to start audio socket server
def start_audio_server():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Reuse address
server_socket.bind((AUDIO_SERVER_IP, AUDIO_SERVER_PORT))
server_socket.listen()
print(f"Audio server started on {AUDIO_SERVER_IP}:{AUDIO_SERVER_PORT}")
while True:
client_socket, addr = server_socket.accept()
print(f"Audio client connected from {addr}")
# Add client socket to list with thread-safe lock
with audio_clients_lock:
audio_clients.append(client_socket)
# Start a thread to handle client disconnection
threading.Thread(target=handle_client_disconnection, args=(client_socket,), daemon=True).start()
# Function to handle client disconnection
def handle_client_disconnection(client_socket):
try:
# Keep the connection open
while True:
data = client_socket.recv(1024)
if not data:
break
except Exception as e:
print(f"Client disconnected: {e}")
finally:
with audio_clients_lock:
if client_socket in audio_clients:
audio_clients.remove(client_socket)
client_socket.close()
print("Client socket closed")
# Function to receive audio data from clients
def start_data_server():
data_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
data_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
data_server_socket.bind((AUDIO_SERVER_IP, DATA_SERVER_PORT))
data_server_socket.listen()
print(f"Data server started on {AUDIO_SERVER_IP}:{DATA_SERVER_PORT}")
while True:
client_socket, addr = data_server_socket.accept()
print(f"Data client connected from {addr}")
threading.Thread(target=handle_data_client, args=(client_socket,), daemon=True).start()
# Function to handle data client
def handle_data_client(client_socket):
global recorder
try:
while True:
data = client_socket.recv(4096)
if not data:
break
# Feed data to the recorder
recorder.feed_audio(data)
except Exception as e:
print(f"Data client error: {e}")
finally:
client_socket.close()
print("Data client socket closed")
def recorder_loop():
global recorder
def process_text(text):
pass # You can implement any processing here if needed
try:
while True:
recorder.text(process_text)
except Exception as e:
print(e)
async def monitor_text_stability():
global prev_text, last_text_change_time, llm_tts_task, is_interrupted
while True:
await asyncio.sleep(0.1)
if prev_text != "" and time.time() - last_text_change_time >= text_stable_duration:
text_to_send = prev_text
prev_text = ""
# Cancel any ongoing LLM and TTS task
if llm_tts_task and not llm_tts_task.done():
is_interrupted = True
llm_tts_task.cancel()
tts_stream.stop()
is_interrupted = False
# Start a new LLM and TTS task
llm_tts_task = asyncio.create_task(handle_llm_and_tts(text_to_send))
# Function for main loop
def main():
global recorder, loop
# Initialize the event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Initialize recorder with use_microphone=False
recorder = AudioToTextRecorder(
model='large-v2',
# realtime_model_type='tiny.en',
# realtime_model_type='medium',
realtime_model_type='large-v3',
language='ru',
input_device_index=1,
silero_sensitivity=0.05,
silero_use_onnx=False,
webrtc_sensitivity=3,
post_speech_silence_duration=0.7, # This corresponds to unknown_sentence_detection_pause
min_length_of_recording=1.1,
min_gap_between_recordings=0,
enable_realtime_transcription=True,
realtime_processing_pause=0.02,
silero_deactivity_detection=True,
early_transcription_on_silence=0.2,
beam_size=5,
beam_size_realtime=3,
# initial_prompt="Incomplete thoughts should end with '...'. Examples of complete thoughts: 'The sky is blue.' 'She walked home.' Examples of incomplete thoughts: 'When the sky...' 'Because he...'",
initial_prompt="",
wake_words="",
wake_words_sensitivity=0.5,
wake_word_timeout=5.0,
wake_word_activation_delay=20,
wakeword_backend='none',
openwakeword_model_paths=None,
openwakeword_inference_framework='tensorflow',
wake_word_buffer_duration=1.0,
use_main_model_for_realtime=False,
spinner=False,
use_microphone=False, # Important: We receive audio from client, not from microphone
on_realtime_transcription_update=text_detected, # Assuming make_callback is not used here
use_extended_logging=False,
)
# Start audio server in a separate thread
audio_server_thread = threading.Thread(target=start_audio_server, daemon=True)
audio_server_thread.start()
# Start data server in a separate thread
data_server_thread = threading.Thread(target=start_data_server, daemon=True)
data_server_thread.start()
# Start recorder in a separate thread
recorder_thread = threading.Thread(target=recorder_loop, daemon=True)
recorder_thread.start()
# Schedule the text stability monitoring task
loop.create_task(monitor_text_stability())
# Start the event loop
try:
loop.run_forever()
except KeyboardInterrupt:
print("Server is shutting down...")
finally:
# Stop recorder and close resources
recorder.stop()
recorder.shutdown()
loop.stop()
loop.close()
if __name__ == '__main__':
chat_store = SimpleChatStore.from_persist_path(
persist_path="~/chat/chat_store.json"
)
chat_memory = ChatMemoryBuffer.from_defaults(
token_limit=8192,
chat_store=chat_store,
chat_store_key="User",
)
# Settings for LLM and TTS
# Initialize Ollama LLM
LLM = Ollama(model="gemma2:9b", base_url="http://ollama:11434")
prompt1 = """
You are a friendly and helpful female voice assistant. You are aware that you are communicating through voice, so your responses should be clear, concise, and conversational, as if you are having a natural spoken conversation. Use a warm and approachable tone. Do not use any special symbols or formatting, such as lists. Just speak as if it's a regular dialogue. Always be ready to assist with follow-up questions or actions. Here are examples of how you might respond:
Remember to keep your responses short and engaging, and always be ready to assist further if needed. Avoid using any special symbols or formatting to ensure smooth text-to-speech conversion.
"""
chat = SimpleChatEngine.from_defaults(llm=LLM, memory=chat_memory, system_prompt=prompt1)
# Initialize TTS engine
# TTS_ENGINE = CoquiEngine(voice="Alma María")
TTS_ENGINE = CoquiEngine(voice="Chandra MacFarland")
tts_stream = TextToAudioStream(TTS_ENGINE, muted=True)
main()