import asyncio import threading import time import socket import queue # Import the standard threading Queue from llama_index.core.chat_engine import SimpleChatEngine from llama_index.core.storage.chat_store import SimpleChatStore from llama_index.core.memory import ChatMemoryBuffer import TTS.tts.utils.text.cleaners as cleaners import re # Import necessary modules for STT, LLM, and TTS from RealtimeSTT import AudioToTextRecorder from RealtimeTTS import TextToAudioStream, CoquiEngine # You can use another TTS engine if preferred from llama_index.llms.ollama import Ollama # Settings for audio socket AUDIO_SERVER_IP = '0.0.0.0' AUDIO_SERVER_PORT = 65432 DATA_SERVER_PORT = 8012 # Global variables recorder = None prev_text = "" last_text_change_time = time.time() text_stable_duration = 1 # Time duration without text changes to trigger LLM audio_clients = [] # List of connected audio clients audio_clients_lock = threading.Lock() is_llm_processing = False is_interrupted = False # New variable to track interruption llm_tts_task = None # Task for LLM and TTS processing loop = None # Event loop # Function to process detected text def text_detected(text): global prev_text, last_text_change_time, is_llm_processing, is_interrupted, llm_tts_task text = text.strip() if text != prev_text: prev_text = text last_text_change_time = time.time() print(f"Realtime text: {text}") if is_llm_processing: is_interrupted = True if llm_tts_task and not llm_tts_task.done(): llm_tts_task.cancel() tts_stream.stop() print("LLM and TTS have been interrupted due to new user input.") async def handle_llm_and_tts(prompt): global is_llm_processing, is_interrupted, llm_tts_task is_llm_processing = True is_interrupted = False print(f"Sending to LLM: {prompt}") q = queue.Queue() def llm_streaming(): response = chat.stream_chat(prompt) for completion in response.response_gen: if is_interrupted: print("\nLLM generation interrupted.") break completion = cleaners.replace_symbols(completion, lang=None) completion = cleaners.remove_aux_symbols(completion) completion = re.sub(r"[\*]+", "", completion) completion = re.sub(r'[^a-zA-Zа-яА-ЯёЁ0-9\s.,!?;:\'\"\*-]', '', completion) completion = re.sub(r'\s+', ' ', completion) # Put completion into the queue q.put(completion) print(completion, end='', flush=True) chat_store.persist(persist_path="~/chat/chat_store.json") # Signal that LLM streaming is done q.put(None) # Start llm_streaming in a separate thread threading.Thread(target=llm_streaming, daemon=True).start() def text_stream(): while True: if is_interrupted: break try: delta = q.get(timeout=0.1) if delta is None: break yield delta except queue.Empty: continue tts_stream.feed(text_stream()) try: await play_and_send_audio() except asyncio.CancelledError: print("LLM and TTS task was cancelled.") is_llm_processing = False async def play_and_send_audio(): global is_interrupted def on_audio_chunk(chunk): if is_interrupted: return with audio_clients_lock: for client_socket in audio_clients: try: client_socket.sendall(chunk) except Exception as e: print(f"Error sending audio to client: {e}") audio_clients.remove(client_socket) tts_stream.play(on_audio_chunk=on_audio_chunk, muted=True) # Function to start audio socket server def start_audio_server(): server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Reuse address server_socket.bind((AUDIO_SERVER_IP, AUDIO_SERVER_PORT)) server_socket.listen() print(f"Audio server started on {AUDIO_SERVER_IP}:{AUDIO_SERVER_PORT}") while True: client_socket, addr = server_socket.accept() print(f"Audio client connected from {addr}") # Add client socket to list with thread-safe lock with audio_clients_lock: audio_clients.append(client_socket) # Start a thread to handle client disconnection threading.Thread(target=handle_client_disconnection, args=(client_socket,), daemon=True).start() # Function to handle client disconnection def handle_client_disconnection(client_socket): try: # Keep the connection open while True: data = client_socket.recv(1024) if not data: break except Exception as e: print(f"Client disconnected: {e}") finally: with audio_clients_lock: if client_socket in audio_clients: audio_clients.remove(client_socket) client_socket.close() print("Client socket closed") # Function to receive audio data from clients def start_data_server(): data_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) data_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) data_server_socket.bind((AUDIO_SERVER_IP, DATA_SERVER_PORT)) data_server_socket.listen() print(f"Data server started on {AUDIO_SERVER_IP}:{DATA_SERVER_PORT}") while True: client_socket, addr = data_server_socket.accept() print(f"Data client connected from {addr}") threading.Thread(target=handle_data_client, args=(client_socket,), daemon=True).start() # Function to handle data client def handle_data_client(client_socket): global recorder try: while True: data = client_socket.recv(4096) if not data: break # Feed data to the recorder recorder.feed_audio(data) except Exception as e: print(f"Data client error: {e}") finally: client_socket.close() print("Data client socket closed") def recorder_loop(): global recorder def process_text(text): pass # You can implement any processing here if needed try: while True: recorder.text(process_text) except Exception as e: print(e) async def monitor_text_stability(): global prev_text, last_text_change_time, llm_tts_task, is_interrupted while True: await asyncio.sleep(0.1) if prev_text != "" and time.time() - last_text_change_time >= text_stable_duration: text_to_send = prev_text prev_text = "" # Cancel any ongoing LLM and TTS task if llm_tts_task and not llm_tts_task.done(): is_interrupted = True llm_tts_task.cancel() tts_stream.stop() is_interrupted = False # Start a new LLM and TTS task llm_tts_task = asyncio.create_task(handle_llm_and_tts(text_to_send)) # Function for main loop def main(): global recorder, loop # Initialize the event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Initialize recorder with use_microphone=False recorder = AudioToTextRecorder( model='large-v2', # realtime_model_type='tiny.en', # realtime_model_type='medium', realtime_model_type='large-v3', language='ru', input_device_index=1, silero_sensitivity=0.05, silero_use_onnx=False, webrtc_sensitivity=3, post_speech_silence_duration=0.7, # This corresponds to unknown_sentence_detection_pause min_length_of_recording=1.1, min_gap_between_recordings=0, enable_realtime_transcription=True, realtime_processing_pause=0.02, silero_deactivity_detection=True, early_transcription_on_silence=0.2, beam_size=5, beam_size_realtime=3, # initial_prompt="Incomplete thoughts should end with '...'. Examples of complete thoughts: 'The sky is blue.' 'She walked home.' Examples of incomplete thoughts: 'When the sky...' 'Because he...'", initial_prompt="", wake_words="", wake_words_sensitivity=0.5, wake_word_timeout=5.0, wake_word_activation_delay=20, wakeword_backend='none', openwakeword_model_paths=None, openwakeword_inference_framework='tensorflow', wake_word_buffer_duration=1.0, use_main_model_for_realtime=False, spinner=False, use_microphone=False, # Important: We receive audio from client, not from microphone on_realtime_transcription_update=text_detected, # Assuming make_callback is not used here use_extended_logging=False, ) # Start audio server in a separate thread audio_server_thread = threading.Thread(target=start_audio_server, daemon=True) audio_server_thread.start() # Start data server in a separate thread data_server_thread = threading.Thread(target=start_data_server, daemon=True) data_server_thread.start() # Start recorder in a separate thread recorder_thread = threading.Thread(target=recorder_loop, daemon=True) recorder_thread.start() # Schedule the text stability monitoring task loop.create_task(monitor_text_stability()) # Start the event loop try: loop.run_forever() except KeyboardInterrupt: print("Server is shutting down...") finally: # Stop recorder and close resources recorder.stop() recorder.shutdown() loop.stop() loop.close() if __name__ == '__main__': chat_store = SimpleChatStore.from_persist_path( persist_path="~/chat/chat_store.json" ) chat_memory = ChatMemoryBuffer.from_defaults( token_limit=8192, chat_store=chat_store, chat_store_key="User", ) # Settings for LLM and TTS # Initialize Ollama LLM LLM = Ollama(model="gemma2:9b", base_url="http://ollama:11434") prompt1 = """ You are a friendly and helpful female voice assistant. You are aware that you are communicating through voice, so your responses should be clear, concise, and conversational, as if you are having a natural spoken conversation. Use a warm and approachable tone. Do not use any special symbols or formatting, such as lists. Just speak as if it's a regular dialogue. Always be ready to assist with follow-up questions or actions. Here are examples of how you might respond: Remember to keep your responses short and engaging, and always be ready to assist further if needed. Avoid using any special symbols or formatting to ensure smooth text-to-speech conversion. """ chat = SimpleChatEngine.from_defaults(llm=LLM, memory=chat_memory, system_prompt=prompt1) # Initialize TTS engine # TTS_ENGINE = CoquiEngine(voice="Alma María") TTS_ENGINE = CoquiEngine(voice="Chandra MacFarland") tts_stream = TextToAudioStream(TTS_ENGINE, muted=True) main()