|
|
import asyncio
|
|
|
import threading
|
|
|
import time
|
|
|
import socket
|
|
|
import queue # Import the standard threading Queue
|
|
|
from llama_index.core.chat_engine import SimpleChatEngine
|
|
|
from llama_index.core.storage.chat_store import SimpleChatStore
|
|
|
from llama_index.core.memory import ChatMemoryBuffer
|
|
|
import TTS.tts.utils.text.cleaners as cleaners
|
|
|
import re
|
|
|
|
|
|
# Import necessary modules for STT, LLM, and TTS
|
|
|
from RealtimeSTT import AudioToTextRecorder
|
|
|
from RealtimeTTS import TextToAudioStream, CoquiEngine # You can use another TTS engine if preferred
|
|
|
from llama_index.llms.ollama import Ollama
|
|
|
|
|
|
# Settings for audio socket
|
|
|
AUDIO_SERVER_IP = '0.0.0.0'
|
|
|
AUDIO_SERVER_PORT = 65432
|
|
|
DATA_SERVER_PORT = 8012
|
|
|
|
|
|
# Global variables
|
|
|
recorder = None
|
|
|
prev_text = ""
|
|
|
last_text_change_time = time.time()
|
|
|
text_stable_duration = 1 # Time duration without text changes to trigger LLM
|
|
|
audio_clients = [] # List of connected audio clients
|
|
|
audio_clients_lock = threading.Lock()
|
|
|
is_llm_processing = False
|
|
|
is_interrupted = False # New variable to track interruption
|
|
|
llm_tts_task = None # Task for LLM and TTS processing
|
|
|
loop = None # Event loop
|
|
|
|
|
|
# Function to process detected text
|
|
|
def text_detected(text):
|
|
|
global prev_text, last_text_change_time, is_llm_processing, is_interrupted, llm_tts_task
|
|
|
text = text.strip()
|
|
|
if text != prev_text:
|
|
|
prev_text = text
|
|
|
last_text_change_time = time.time()
|
|
|
print(f"Realtime text: {text}")
|
|
|
if is_llm_processing:
|
|
|
is_interrupted = True
|
|
|
if llm_tts_task and not llm_tts_task.done():
|
|
|
llm_tts_task.cancel()
|
|
|
tts_stream.stop()
|
|
|
print("LLM and TTS have been interrupted due to new user input.")
|
|
|
|
|
|
async def handle_llm_and_tts(prompt):
|
|
|
global is_llm_processing, is_interrupted, llm_tts_task
|
|
|
is_llm_processing = True
|
|
|
is_interrupted = False
|
|
|
print(f"Sending to LLM: {prompt}")
|
|
|
|
|
|
q = queue.Queue()
|
|
|
|
|
|
def llm_streaming():
|
|
|
response = chat.stream_chat(prompt)
|
|
|
for completion in response.response_gen:
|
|
|
if is_interrupted:
|
|
|
print("\nLLM generation interrupted.")
|
|
|
break
|
|
|
completion = cleaners.replace_symbols(completion, lang=None)
|
|
|
completion = cleaners.remove_aux_symbols(completion)
|
|
|
completion = re.sub(r"[\*]+", "", completion)
|
|
|
completion = re.sub(r'[^a-zA-Zа-яА-ЯёЁ0-9\s.,!?;:\'\"\*-]', '', completion)
|
|
|
completion = re.sub(r'\s+', ' ', completion)
|
|
|
|
|
|
# Put completion into the queue
|
|
|
q.put(completion)
|
|
|
print(completion, end='', flush=True)
|
|
|
chat_store.persist(persist_path="~/chat/chat_store.json")
|
|
|
# Signal that LLM streaming is done
|
|
|
q.put(None)
|
|
|
|
|
|
# Start llm_streaming in a separate thread
|
|
|
threading.Thread(target=llm_streaming, daemon=True).start()
|
|
|
|
|
|
def text_stream():
|
|
|
while True:
|
|
|
if is_interrupted:
|
|
|
break
|
|
|
try:
|
|
|
delta = q.get(timeout=0.1)
|
|
|
if delta is None:
|
|
|
break
|
|
|
yield delta
|
|
|
except queue.Empty:
|
|
|
continue
|
|
|
|
|
|
tts_stream.feed(text_stream())
|
|
|
try:
|
|
|
await play_and_send_audio()
|
|
|
except asyncio.CancelledError:
|
|
|
print("LLM and TTS task was cancelled.")
|
|
|
is_llm_processing = False
|
|
|
|
|
|
async def play_and_send_audio():
|
|
|
global is_interrupted
|
|
|
def on_audio_chunk(chunk):
|
|
|
if is_interrupted:
|
|
|
return
|
|
|
with audio_clients_lock:
|
|
|
for client_socket in audio_clients:
|
|
|
try:
|
|
|
client_socket.sendall(chunk)
|
|
|
except Exception as e:
|
|
|
print(f"Error sending audio to client: {e}")
|
|
|
audio_clients.remove(client_socket)
|
|
|
tts_stream.play(on_audio_chunk=on_audio_chunk, muted=True)
|
|
|
|
|
|
# Function to start audio socket server
|
|
|
def start_audio_server():
|
|
|
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Reuse address
|
|
|
server_socket.bind((AUDIO_SERVER_IP, AUDIO_SERVER_PORT))
|
|
|
server_socket.listen()
|
|
|
print(f"Audio server started on {AUDIO_SERVER_IP}:{AUDIO_SERVER_PORT}")
|
|
|
|
|
|
while True:
|
|
|
client_socket, addr = server_socket.accept()
|
|
|
print(f"Audio client connected from {addr}")
|
|
|
# Add client socket to list with thread-safe lock
|
|
|
with audio_clients_lock:
|
|
|
audio_clients.append(client_socket)
|
|
|
# Start a thread to handle client disconnection
|
|
|
threading.Thread(target=handle_client_disconnection, args=(client_socket,), daemon=True).start()
|
|
|
|
|
|
# Function to handle client disconnection
|
|
|
def handle_client_disconnection(client_socket):
|
|
|
try:
|
|
|
# Keep the connection open
|
|
|
while True:
|
|
|
data = client_socket.recv(1024)
|
|
|
if not data:
|
|
|
break
|
|
|
except Exception as e:
|
|
|
print(f"Client disconnected: {e}")
|
|
|
finally:
|
|
|
with audio_clients_lock:
|
|
|
if client_socket in audio_clients:
|
|
|
audio_clients.remove(client_socket)
|
|
|
client_socket.close()
|
|
|
print("Client socket closed")
|
|
|
|
|
|
# Function to receive audio data from clients
|
|
|
def start_data_server():
|
|
|
data_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
data_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
|
data_server_socket.bind((AUDIO_SERVER_IP, DATA_SERVER_PORT))
|
|
|
data_server_socket.listen()
|
|
|
print(f"Data server started on {AUDIO_SERVER_IP}:{DATA_SERVER_PORT}")
|
|
|
|
|
|
while True:
|
|
|
client_socket, addr = data_server_socket.accept()
|
|
|
print(f"Data client connected from {addr}")
|
|
|
threading.Thread(target=handle_data_client, args=(client_socket,), daemon=True).start()
|
|
|
|
|
|
# Function to handle data client
|
|
|
def handle_data_client(client_socket):
|
|
|
global recorder
|
|
|
try:
|
|
|
while True:
|
|
|
data = client_socket.recv(4096)
|
|
|
if not data:
|
|
|
break
|
|
|
# Feed data to the recorder
|
|
|
recorder.feed_audio(data)
|
|
|
except Exception as e:
|
|
|
print(f"Data client error: {e}")
|
|
|
finally:
|
|
|
client_socket.close()
|
|
|
print("Data client socket closed")
|
|
|
|
|
|
def recorder_loop():
|
|
|
global recorder
|
|
|
|
|
|
def process_text(text):
|
|
|
pass # You can implement any processing here if needed
|
|
|
try:
|
|
|
while True:
|
|
|
recorder.text(process_text)
|
|
|
except Exception as e:
|
|
|
print(e)
|
|
|
|
|
|
async def monitor_text_stability():
|
|
|
global prev_text, last_text_change_time, llm_tts_task, is_interrupted
|
|
|
while True:
|
|
|
await asyncio.sleep(0.1)
|
|
|
if prev_text != "" and time.time() - last_text_change_time >= text_stable_duration:
|
|
|
text_to_send = prev_text
|
|
|
prev_text = ""
|
|
|
# Cancel any ongoing LLM and TTS task
|
|
|
if llm_tts_task and not llm_tts_task.done():
|
|
|
is_interrupted = True
|
|
|
llm_tts_task.cancel()
|
|
|
tts_stream.stop()
|
|
|
is_interrupted = False
|
|
|
# Start a new LLM and TTS task
|
|
|
llm_tts_task = asyncio.create_task(handle_llm_and_tts(text_to_send))
|
|
|
|
|
|
# Function for main loop
|
|
|
def main():
|
|
|
global recorder, loop
|
|
|
# Initialize the event loop
|
|
|
loop = asyncio.new_event_loop()
|
|
|
asyncio.set_event_loop(loop)
|
|
|
|
|
|
# Initialize recorder with use_microphone=False
|
|
|
recorder = AudioToTextRecorder(
|
|
|
model='large-v2',
|
|
|
# realtime_model_type='tiny.en',
|
|
|
# realtime_model_type='medium',
|
|
|
realtime_model_type='large-v3',
|
|
|
language='ru',
|
|
|
input_device_index=1,
|
|
|
silero_sensitivity=0.05,
|
|
|
silero_use_onnx=False,
|
|
|
webrtc_sensitivity=3,
|
|
|
post_speech_silence_duration=0.7, # This corresponds to unknown_sentence_detection_pause
|
|
|
min_length_of_recording=1.1,
|
|
|
min_gap_between_recordings=0,
|
|
|
enable_realtime_transcription=True,
|
|
|
realtime_processing_pause=0.02,
|
|
|
silero_deactivity_detection=True,
|
|
|
early_transcription_on_silence=0.2,
|
|
|
beam_size=5,
|
|
|
beam_size_realtime=3,
|
|
|
# initial_prompt="Incomplete thoughts should end with '...'. Examples of complete thoughts: 'The sky is blue.' 'She walked home.' Examples of incomplete thoughts: 'When the sky...' 'Because he...'",
|
|
|
initial_prompt="",
|
|
|
wake_words="",
|
|
|
wake_words_sensitivity=0.5,
|
|
|
wake_word_timeout=5.0,
|
|
|
wake_word_activation_delay=20,
|
|
|
wakeword_backend='none',
|
|
|
openwakeword_model_paths=None,
|
|
|
openwakeword_inference_framework='tensorflow',
|
|
|
wake_word_buffer_duration=1.0,
|
|
|
use_main_model_for_realtime=False,
|
|
|
spinner=False,
|
|
|
use_microphone=False, # Important: We receive audio from client, not from microphone
|
|
|
on_realtime_transcription_update=text_detected, # Assuming make_callback is not used here
|
|
|
use_extended_logging=False,
|
|
|
)
|
|
|
|
|
|
# Start audio server in a separate thread
|
|
|
audio_server_thread = threading.Thread(target=start_audio_server, daemon=True)
|
|
|
audio_server_thread.start()
|
|
|
|
|
|
# Start data server in a separate thread
|
|
|
data_server_thread = threading.Thread(target=start_data_server, daemon=True)
|
|
|
data_server_thread.start()
|
|
|
|
|
|
# Start recorder in a separate thread
|
|
|
recorder_thread = threading.Thread(target=recorder_loop, daemon=True)
|
|
|
recorder_thread.start()
|
|
|
|
|
|
# Schedule the text stability monitoring task
|
|
|
loop.create_task(monitor_text_stability())
|
|
|
|
|
|
# Start the event loop
|
|
|
try:
|
|
|
loop.run_forever()
|
|
|
except KeyboardInterrupt:
|
|
|
print("Server is shutting down...")
|
|
|
finally:
|
|
|
# Stop recorder and close resources
|
|
|
recorder.stop()
|
|
|
recorder.shutdown()
|
|
|
loop.stop()
|
|
|
loop.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
chat_store = SimpleChatStore.from_persist_path(
|
|
|
persist_path="~/chat/chat_store.json"
|
|
|
)
|
|
|
|
|
|
chat_memory = ChatMemoryBuffer.from_defaults(
|
|
|
token_limit=8192,
|
|
|
chat_store=chat_store,
|
|
|
chat_store_key="User",
|
|
|
)
|
|
|
|
|
|
# Settings for LLM and TTS
|
|
|
# Initialize Ollama LLM
|
|
|
LLM = Ollama(model="gemma2:9b", base_url="http://ollama:11434")
|
|
|
|
|
|
prompt1 = """
|
|
|
You are a friendly and helpful female voice assistant. You are aware that you are communicating through voice, so your responses should be clear, concise, and conversational, as if you are having a natural spoken conversation. Use a warm and approachable tone. Do not use any special symbols or formatting, such as lists. Just speak as if it's a regular dialogue. Always be ready to assist with follow-up questions or actions. Here are examples of how you might respond:
|
|
|
Remember to keep your responses short and engaging, and always be ready to assist further if needed. Avoid using any special symbols or formatting to ensure smooth text-to-speech conversion.
|
|
|
"""
|
|
|
|
|
|
chat = SimpleChatEngine.from_defaults(llm=LLM, memory=chat_memory, system_prompt=prompt1)
|
|
|
|
|
|
# Initialize TTS engine
|
|
|
# TTS_ENGINE = CoquiEngine(voice="Alma María")
|
|
|
TTS_ENGINE = CoquiEngine(voice="Chandra MacFarland")
|
|
|
tts_stream = TextToAudioStream(TTS_ENGINE, muted=True)
|
|
|
|
|
|
main()
|
|
|
|