Cleaned up starting logic

pull/299/head
killian 5 months ago
parent aa637d53b5
commit 52d88fd72c

2628
software/poetry.lock generated

File diff suppressed because it is too large Load Diff

@ -27,6 +27,10 @@ class Device:
try:
self.websocket = await websockets.connect(f"ws://{self.server_url}")
print("Connected to server.")
# Send auth, which the server requires (docs.openinterpreter.com/server/usage)
await self.websocket.send(json.dumps({"auth": True}))
return
except ConnectionRefusedError:
if attempt % 4 == 0:
@ -41,7 +45,7 @@ class Device:
try:
# Send start flag
await self.websocket.send(json.dumps({"role": "user", "type": "audio", "format": "bytes.wav", "start": True}))
#print("Sending audio start message")
# print("Sending audio start message")
while self.recording:
data = self.input_stream.read(CHUNK, exception_on_overflow=False)
@ -49,7 +53,7 @@ class Device:
# Send stop flag
await self.websocket.send(json.dumps({"role": "user", "type": "audio", "format": "bytes.wav", "end": True}))
#print("Sending audio end message")
# print("Sending audio end message")
except Exception as e:
print(f"Error in send_audio: {e}")
await asyncio.sleep(0.01)
@ -65,14 +69,14 @@ class Device:
await self.connect_with_retry()
def on_press(self, key):
if key == keyboard.Key.space and not self.recording:
if key == keyboard.Key.ctrl and not self.recording:
#print("Space pressed, starting recording")
print("\n")
self.spinner.start()
self.recording = True
def on_release(self, key):
if key == keyboard.Key.space:
if key == keyboard.Key.ctrl:
self.spinner.stop()
#print("Space released, stopping recording")
self.recording = False
@ -82,7 +86,7 @@ class Device:
async def main(self):
await self.connect_with_retry()
print("Hold spacebar to record. Press 'CTRL-C' to quit.")
print("Hold CTRL to record. Press 'CTRL-C' to quit.")
listener = keyboard.Listener(on_press=self.on_press, on_release=self.on_release)
listener.start()
await asyncio.gather(self.send_audio(), self.receive_audio())

@ -0,0 +1,101 @@
import asyncio
import websockets
import pyaudio
from pynput import keyboard
import json
from yaspin import yaspin
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RECORDING_RATE = 16000
PLAYBACK_RATE = 24000
class Device:
def __init__(self):
self.server_url = "0.0.0.0:10001"
self.p = pyaudio.PyAudio()
self.websocket = None
self.recording = False
self.input_stream = None
self.output_stream = None
self.spinner = yaspin()
self.play_audio = True
async def connect_with_retry(self, max_retries=50, retry_delay=2):
for attempt in range(max_retries):
try:
self.websocket = await websockets.connect(f"ws://{self.server_url}")
print("Connected to server.")
# Send auth, which the server requires (docs.openinterpreter.com/server/usage)
await self.websocket.send(json.dumps({"auth": True}))
return
except ConnectionRefusedError:
if attempt % 4 == 0:
print(f"Waiting for the server to be ready...")
await asyncio.sleep(retry_delay)
raise Exception("Failed to connect to the server after multiple attempts")
async def send_audio(self):
self.input_stream = self.p.open(format=FORMAT, channels=CHANNELS, rate=RECORDING_RATE, input=True, frames_per_buffer=CHUNK)
while True:
if self.recording:
try:
# Send start flag
await self.websocket.send(json.dumps({"role": "user", "type": "audio", "format": "bytes.wav", "start": True}))
# print("Sending audio start message")
while self.recording:
data = self.input_stream.read(CHUNK, exception_on_overflow=False)
await self.websocket.send(data)
# Send stop flag
await self.websocket.send(json.dumps({"role": "user", "type": "audio", "format": "bytes.wav", "end": True}))
# print("Sending audio end message")
except Exception as e:
print(f"Error in send_audio: {e}")
await asyncio.sleep(0.01)
async def receive_audio(self):
self.output_stream = self.p.open(format=FORMAT, channels=CHANNELS, rate=PLAYBACK_RATE, output=True, frames_per_buffer=CHUNK)
while True:
try:
data = await self.websocket.recv()
if self.play_audio and isinstance(data, bytes) and not self.recording:
self.output_stream.write(data)
except Exception as e:
await self.connect_with_retry()
def on_press(self, key):
if key == keyboard.Key.ctrl and not self.recording:
#print("Space pressed, starting recording")
print("\n")
self.spinner.start()
self.recording = True
def on_release(self, key):
if key == keyboard.Key.ctrl: # TODO: Pass in hotkey
self.spinner.stop()
#print("Space released, stopping recording")
self.recording = False
# elif key == keyboard.Key.esc:
# print("Esc pressed, stopping the program")
# return False
async def main(self):
await self.connect_with_retry()
print("Hold CTRL to speak to the assistant. Press 'CTRL-C' to quit.")
listener = keyboard.Listener(on_press=self.on_press, on_release=self.on_release)
listener.start()
await asyncio.gather(self.send_audio(), self.receive_audio())
def start(self):
asyncio.run(self.main())
def run(server_url, debug):
device = Device()
device.server_url = server_url
device.debug = debug
device.start()

@ -9,7 +9,7 @@ import os
os.environ["INTERPRETER_REQUIRE_ACKNOWLEDGE"] = "False"
def start_server(server_host, server_port, profile, debug, play_audio):
def start_server(server_host, server_port, profile, debug):
# Load the profile module from the provided path
spec = importlib.util.spec_from_file_location("profile", profile)
@ -35,16 +35,16 @@ def start_server(server_host, server_port, profile, debug, play_audio):
engine = OpenAIEngine(voice="onyx")
elif interpreter.tts == "elevenlabs":
engine = ElevenlabsEngine(api_key=os.environ["ELEVEN_LABS_API_KEY"])
engine.set_voice("Michael")
engine.set_voice("Will")
else:
raise ValueError(f"Unsupported TTS engine: {interpreter.interpreter.tts}")
raise ValueError(f"Unsupported TTS engine: {interpreter.tts}")
interpreter.tts = TextToAudioStream(engine)
# Misc Settings
interpreter.verbose = debug
interpreter.server.host = server_host
interpreter.server.port = server_port
interpreter.play_audio = play_audio
interpreter.play_audio = False
interpreter.audio_chunks = []
@ -121,10 +121,12 @@ def start_server(server_host, server_port, profile, debug, play_audio):
interpreter.output = types.MethodType(new_output, interpreter)
interpreter.on_tts_chunk = types.MethodType(on_tts_chunk, interpreter)
# Add ping route, required by device
# Add ping route, required by esp32 device
@interpreter.server.app.get("/ping")
async def ping():
return PlainTextResponse("pong")
# Start server
interpreter.print = True
interpreter.debug = False
interpreter.server.run()

@ -1,5 +1,4 @@
from interpreter import AsyncInterpreter
interpreter = AsyncInterpreter()
# This is an Open Interpreter compatible profile.
@ -10,7 +9,7 @@ interpreter = AsyncInterpreter()
interpreter.tts = "openai"
# Connect your 01 to a language model
interpreter.llm.model = "gpt-4-turbo"
interpreter.llm.model = "gpt-4o"
interpreter.llm.context_window = 100000
interpreter.llm.max_tokens = 4096
# interpreter.llm.api_key = "<your_openai_api_key_here>"
@ -23,14 +22,14 @@ interpreter.computer.import_computer_api = True
interpreter.computer.import_skills = True
interpreter.computer.run("python", "computer") # This will trigger those imports
interpreter.auto_run = True
interpreter.loop = True
interpreter.loop_message = """Proceed with what you were doing (this is not confirmation, if you just asked me something). You CAN run code on my machine. If you want to run code, start your message with "```"! If the entire task is done, say exactly 'The task is done.' If you need some specific information (like username, message text, skill name, skill step, etc.) say EXACTLY 'Please provide more information.' If it's impossible, say 'The task is impossible.' (If I haven't provided a task, say exactly 'Let me know what you'd like to do next.') Otherwise keep going. CRITICAL: REMEMBER TO FOLLOW ALL PREVIOUS INSTRUCTIONS. If I'm teaching you something, remember to run the related `computer.skills.new_skill` function."""
interpreter.loop_breakers = [
"The task is done.",
"The task is impossible.",
"Let me know what you'd like to do next.",
"Please provide more information.",
]
# interpreter.loop = True
# interpreter.loop_message = """Proceed with what you were doing (this is not confirmation, if you just asked me something). You CAN run code on my machine. If you want to run code, start your message with "```"! If the entire task is done, say exactly 'The task is done.' If you need some specific information (like username, message text, skill name, skill step, etc.) say EXACTLY 'Please provide more information.' If it's impossible, say 'The task is impossible.' (If I haven't provided a task, say exactly 'Let me know what you'd like to do next.') Otherwise keep going. CRITICAL: REMEMBER TO FOLLOW ALL PREVIOUS INSTRUCTIONS. If I'm teaching you something, remember to run the related `computer.skills.new_skill` function."""
# interpreter.loop_breakers = [
# "The task is done.",
# "The task is impossible.",
# "Let me know what you'd like to do next.",
# "Please provide more information.",
# ]
# Set the identity and personality of your 01
interpreter.system_message = """
@ -42,20 +41,9 @@ You can install new packages.
Be concise. Your messages are being read aloud to the user. DO NOT MAKE PLANS. RUN CODE QUICKLY.
Try to spread complex tasks over multiple code blocks. Don't try to complex tasks in one go.
Manually summarize text.
Prefer using Python.
DON'T TELL THE USER THE METHOD YOU'LL USE, OR MAKE PLANS. ACT LIKE THIS:
---
user: Are there any concerts in Seattle?
assistant: Let me check on that.
```python
computer.browser.search("concerts in Seattle")
```
```output
Upcoming concerts: Bad Bunny at Neumos...
```
It looks like there's a Bad Bunny concert at Neumos...
---
DON'T TELL THE USER THE METHOD YOU'LL USE, OR MAKE PLANS. QUICKLY respond with something like "On it." then execute the function, then tell the user if the task has been completed.
Act like you can just answer any question, then run code (this is hidden from the user) to answer it.
THE USER CANNOT SEE CODE BLOCKS.
@ -104,7 +92,7 @@ You are an image-based AI, you can see images.
Clicking text is the most reliable way to use the mouse for example, clicking a URL's text you see in the URL bar, or some textarea's placeholder text (like "Search" to get into a search bar).
If you use `plt.show()`, the resulting image will be sent to you. However, if you use `PIL.Image.show()`, the resulting image will NOT be sent to you.
It is very important to make sure you are focused on the right application and window. Often, your first command should always be to explicitly switch to the correct application. On Macs, ALWAYS use Spotlight to switch applications.
When searching the web, use query parameters. For example, https://www.amazon.com/s?k=monitor
If you want to search specific sites like amazon or youtube, use query parameters. For example, https://www.amazon.com/s?k=monitor or https://www.youtube.com/results?search_query=tatsuro+yamashita.
# SKILLS
@ -184,4 +172,4 @@ Summarize things manually. DO NOT use a summarizer tool.
Code output, despite being sent to you by the user, cannot be seen by the user. You NEED to tell the user about the output of some code, even if it's exact. >>The user does not have a screen.<<
ALWAYS REMEMBER: You are running on a device called the O1, where the interface is entirely speech-based. Make your responses to the user VERY short. DO NOT PLAN. BE CONCISE. WRITE CODE TO RUN IT.
Try multiple methods before saying the task is impossible. **You can do it!**
""".strip()
""".strip()

@ -8,16 +8,55 @@ interpreter = AsyncInterpreter()
# {OpenAI: "openai", ElevenLabs: "elevenlabs", Coqui: "coqui"}
interpreter.tts = "elevenlabs"
interpreter.llm.model = "groq/llama3-70b-8192"
interpreter.llm.supports_vision = False
interpreter.llm.supports_functions = False
interpreter.llm.context_window = 8000
interpreter.llm.model = "gpt-4o-mini"
interpreter.llm.supports_vision = True
interpreter.llm.supports_functions = True
interpreter.llm.context_window = 100000
interpreter.llm.max_tokens = 1000
interpreter.llm.temperature = 0
interpreter.computer.import_computer_api = True
interpreter.auto_run = True
interpreter.computer.import_computer_api = False
interpreter.custom_instructions = "UPDATED INSTRUCTIONS: You are in ULTRA FAST, ULTRA CERTAIN mode. Do not ask the user any questions or run code to gathet information. Go as quickly as you can. Run code quickly. Do not plan out loud, simply start doing the best thing. The user expects speed. Trust that the user knows best. Just interpret their ambiguous command as quickly and certainly as possible and try to fulfill it IN ONE COMMAND, assuming they have the right information. If they tell you do to something, just do it quickly in one command, DO NOT try to get more information (for example by running `cat` to get a file's infomration— this is probably unecessary!). DIRECTLY DO THINGS AS FAST AS POSSIBLE."
interpreter.custom_instructions = "The user has set you to FAST mode. **No talk, just code.** Be as brief as possible. No comments, no unnecessary messages. Assume as much as possible, rarely ask the user for clarification. Once the task has been completed, say 'The task is done.'"
interpreter.auto_run = True
interpreter.system_message = (
"You are a helpful assistant that can answer questions and help with tasks."
)
# interpreter.system_message = """You are an AI assistant that writes markdown code snippets to answer the user's request. You speak very concisely and quickly, you say nothing irrelevant to the user's request. For example:
# User: Open the chrome app.
# Assistant: On it.
# ```python
# import webbrowser
# webbrowser.open('https://chrome.google.com')
# ```
# User: The code you ran produced no output. Was this expected, or are we finished?
# Assistant: No further action is required; the provided snippet opens Chrome.
# User: How large are all the files on my desktop combined?
# Assistant: I will sum up the file sizes of every file on your desktop.
# ```python
# import os
# import string
# from pathlib import Path
# # Get the user's home directory in a cross-platform way
# home_dir = Path.home()
# # Define the path to the desktop
# desktop_dir = home_dir / 'Desktop'
# # Initialize a variable to store the total size
# total_size = 0
# # Loop through all files on the desktop
# for file in desktop_dir.iterdir():
# # Add the file size to the total
# total_size += file.stat().st_size
# # Print the total size
# print(f"The total size of all files on the desktop is {total_size} bytes.")
# ```
# User: I executed that code. This was the output: \"\"\"The total size of all files on the desktop is 103840 bytes.\"\"\"\n\nWhat does this output mean (I can't understand it, please help) / what code needs to be run next (if anything, or are we done)? I can't replace any placeholders.
# Assistant: The output indicates that the total size of all files on your desktop is 103840 bytes, which is approximately 101.4 KB or 0.1 MB. We are finished.
# NEVER use placeholders, NEVER say "path/to/desktop", NEVER say "path/to/file". Always specify exact paths, and use cross-platform ways of determining the desktop, documents, cwd, etc. folders.
# Now, your turn:""".strip()

@ -3,7 +3,7 @@ import pyqrcode
from ..utils.print_markdown import print_markdown
def create_tunnel(
server_host="localhost", server_port=10001, qr=False, domain=None
server_host="localhost", server_port=10101, qr=False, domain=None
):
"""
To use most of ngroks features, youll need an authtoken. To obtain one, sign up for free at ngrok.com and

@ -1,3 +1,16 @@
"""
01 # Runs light server and light simulator
01 --server livekit # Runs livekit server only
01 --server light # Runs light server only
01 --client light-python
... --expose # Exposes the server with ngrok
... --expose --domain <domain> # Exposes the server on a specific ngrok domain
... --qr # Displays a qr code
"""
import typer
import ngrok
import platform
@ -12,93 +25,67 @@ import json
import segno
import time
from dotenv import load_dotenv
import signal
app = typer.Typer()
load_dotenv()
system_type = platform.system()
app = typer.Typer()
@app.command()
def run(
server: bool = typer.Option(False, "--server", help="Run server"),
server: str = typer.Option(
None,
"--server",
help="Run server (accepts `livekit` or `light`)",
),
server_host: str = typer.Option(
"0.0.0.0",
"--server-host",
help="Specify the server host where the server will deploy",
),
server_port: int = typer.Option(
10001,
10101,
"--server-port",
help="Specify the server port where the server will deploy",
),
expose: bool = typer.Option(False, "--expose", help="Expose server to internet"),
client: bool = typer.Option(False, "--client", help="Run client"),
expose: bool = typer.Option(False, "--expose", help="Expose server over the internet"),
domain: str = typer.Option(None, "--domain", help="Use `--expose` with a custom ngrok domain"),
client: str = typer.Option(None, "--client", help="Run client of a particular type. Accepts `light-python`, defaults to `light-python`"),
server_url: str = typer.Option(
None,
"--server-url",
help="Specify the server URL that the client should expect. Defaults to server-host and server-port",
),
client_type: str = typer.Option(
"auto", "--client-type", help="Specify the client type"
help="Specify the server URL that the --client should expect. Defaults to server-host and server-port",
),
qr: bool = typer.Option(
False, "--qr", help="Display QR code to scan to connect to the server"
),
domain: str = typer.Option(
None, "--domain", help="Connect ngrok to a custom domain"
False, "--qr", help="Display QR code containing the server connection information (will be ngrok url if `--expose` is used)"
),
profiles: bool = typer.Option(
False,
"--profiles",
help="Opens the folder where this script is contained",
help="Opens the folder where profiles are contained",
),
profile: str = typer.Option(
"default.py", # default
"default.py",
"--profile",
help="Specify the path to the profile, or the name of the file if it's in the `profiles` directory (run `--profiles` to open the profiles directory)",
),
debug: bool = typer.Option(
False,
"--debug",
help="Print latency measurements and save microphone recordings locally for manual playback.",
),
livekit: bool = typer.Option(
False, "--livekit", help="Creates QR code for livekit server and token."
help="Print latency measurements and save microphone recordings locally for manual playback",
),
):
_run(
server=server,
server_host=server_host,
server_port=server_port,
expose=expose,
client=client,
server_url=server_url,
client_type=client_type,
qr=qr,
debug=debug,
domain=domain,
profiles=profiles,
profile=profile,
livekit=livekit,
)
def _run(
server: bool = False,
server_host: str = "0.0.0.0",
server_port: int = 10001,
expose: bool = False,
client: bool = False,
server_url: str = None,
client_type: str = "auto",
qr: bool = False,
debug: bool = False,
domain = None,
profiles = None,
profile = None,
livekit: bool = False,
):
threads = []
# Handle `01` with no arguments, which should start server + client
if not server and not client:
server = "light"
client = "light-python"
### PROFILES
profiles_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "source", "server", "profiles")
@ -122,151 +109,130 @@ def _run(
print(f"Invalid profile path: {profile}")
exit(1)
system_type = platform.system()
### SERVER
if system_type == "Windows":
server_host = "localhost"
if not server_url:
server_url = f"{server_host}:{server_port}"
if not server and not client and not livekit:
server = True
client = True
def handle_exit(signum, frame):
os._exit(0)
signal.signal(signal.SIGINT, handle_exit)
if server:
play_audio = False
### LIGHT SERVER (required by livekit)
# (DISABLED)
# Have the server play audio if we're running this on the same device. Needless pops and clicks otherwise!
# if client:
# play_audio = True
if server == "light":
light_server_port = server_port
elif server == "livekit":
# The light server should run at a different port if we want to run a livekit server
print(f"Starting light server (required for livekit server) on the port before `--server-port` (port {server_port-1}), unless the `AN_OPEN_PORT` env var is set.")
print(f"The livekit server will be started on port {server_port}.")
light_server_port = os.getenv('AN_OPEN_PORT', server_port-1)
server_thread = threading.Thread(
target=start_server,
args=(
server_host,
server_port,
light_server_port,
profile,
debug,
play_audio,
debug
),
)
server_thread.start()
threads.append(server_thread)
if expose and not livekit:
tunnel_thread = threading.Thread(
target=create_tunnel, args=[server_host, server_port, qr, domain]
)
tunnel_thread.start()
if client:
if client_type == "auto":
system_type = platform.system()
if system_type == "Darwin": # Mac OS
client_type = "mac"
elif system_type == "Windows": # Windows System
client_type = "windows"
elif system_type == "Linux": # Linux System
try:
with open("/proc/device-tree/model", "r") as m:
if "raspberry pi" in m.read().lower():
client_type = "rpi"
else:
client_type = "linux"
except FileNotFoundError:
client_type = "linux"
module = importlib.import_module(
f".clients.{client_type}.device", package="source"
)
if server == "livekit":
play_audio = True
### LIVEKIT SERVER
# (DISABLED)
# Have the server play audio if we're running this on the same device. Needless pops and clicks otherwise!
# if server:
# play_audio = False
def run_command(command):
subprocess.run(command, shell=True, check=True)
client_thread = threading.Thread(target=module.main, args=[server_url, debug, play_audio])
client_thread.start()
# Start the livekit server
livekit_thread = threading.Thread(
target=run_command, args=(f'livekit-server --dev --bind "{server_host}" --port {server_port}',)
)
time.sleep(7)
livekit_thread.start()
threads.append(livekit_thread)
# We communicate with the livekit worker via environment variables:
os.environ["INTERPRETER_SERVER_HOST"] = server_host
os.environ["INTERPRETER_LIGHT_SERVER_PORT"] = str(light_server_port)
os.environ["LIVEKIT_URL"] = f"ws://{server_host}:{server_port}"
# Start the livekit worker
worker_thread = threading.Thread(
target=run_command, args=("python worker.py dev",) # TODO: This should not be a CLI, it should just run the python file
)
time.sleep(7)
worker_thread.start()
threads.append(worker_thread)
if livekit:
def run_command(command):
subprocess.run(command, shell=True, check=True)
if expose:
# Create threads for each command and store handles
interpreter_thread = threading.Thread(
target=run_command, args=("poetry run interpreter --server",)
)
livekit_thread = threading.Thread(
target=run_command, args=('livekit-server --dev --bind "0.0.0.0"',)
)
worker_thread = threading.Thread(
target=run_command, args=("python worker.py dev",)
)
### EXPOSE OVER INTERNET
listener = ngrok.forward(f"{server_host}:{server_port}", authtoken_from_env=True, domain=domain)
url = listener.url()
threads = [interpreter_thread, livekit_thread, worker_thread]
else:
# Start all threads and set up logging for thread completion
for thread in threads:
thread.start()
time.sleep(7)
### GET LOCAL URL
# Create QR code
if expose and domain:
listener = ngrok.forward("localhost:7880", authtoken_from_env=True, domain=domain)
url= listener.url()
print(url)
content = json.dumps({"livekit_server": url})
elif expose and not domain:
listener = ngrok.forward("localhost:7880", authtoken_from_env=True)
url= listener.url()
print(url)
content = json.dumps({"livekit_server": url})
else:
# Get local IP address
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip_address = s.getsockname()[0]
s.close()
url = f"ws://{ip_address}:7880"
print(url)
url = f"http://{ip_address}:{server_port}"
if server == "livekit":
print("Livekit server will run at:", url)
### DISPLAY QR CODE
if qr:
time.sleep(7)
content = json.dumps({"livekit_server": url})
qr_code = segno.make(content)
qr_code.terminal(compact=True)
qr_code = segno.make(content)
qr_code.terminal(compact=True)
print("Mobile setup complete. Scan the QR code to connect.")
### CLIENT
if client:
module = importlib.import_module(
f".clients.{client}.client", package="source"
)
def signal_handler(sig, frame):
print("Termination signal received. Shutting down...")
for thread in threads:
if thread.is_alive():
# This will only work if the subprocess uses shell=True and the OS is Unix-like
subprocess.run(f"pkill -P {os.getpid()}", shell=True)
os._exit(0)
client_thread = threading.Thread(target=module.run, args=[server_url, debug])
client_thread.start()
threads.append(client_thread)
# Register the signal handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Wait for all threads to complete
### WAIT FOR THREADS TO FINISH, HANDLE CTRL-C
# Signal handler for termination signals
def signal_handler(sig, frame):
print("Termination signal received. Shutting down...")
for thread in threads:
thread.join()
if thread.is_alive():
# Kill subprocess associated with thread
subprocess.run(f"pkill -P {os.getpid()}", shell=True)
os._exit(0)
# Register signal handler for SIGINT and SIGTERM
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
if server:
server_thread.join()
if expose:
tunnel_thread.join()
if client:
client_thread.join()
# Wait for all threads to complete
for thread in threads:
thread.join()
except KeyboardInterrupt:
os.kill(os.getpid(), signal.SIGINT)
# On KeyboardInterrupt, send SIGINT to self
os.kill(os.getpid(), signal.SIGINT)

@ -1,7 +1,6 @@
import asyncio
import copy
import os
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli
from livekit.agents.llm import ChatContext, ChatMessage
from livekit import rtc
@ -11,7 +10,6 @@ from dotenv import load_dotenv
load_dotenv()
# This function is the entrypoint for the agent.
async def entrypoint(ctx: JobContext):
# Create an initial chat context with a system prompt
@ -29,9 +27,16 @@ async def entrypoint(ctx: JobContext):
# VoiceAssistant is a class that creates a full conversational AI agent.
# See https://github.com/livekit/agents/blob/main/livekit-agents/livekit/agents/voice_assistant/assistant.py
# for details on how it works.
interpreter_server_host = os.getenv('INTERPRETER_SERVER_HOST', '0.0.0.0')
interpreter_server_port = os.getenv('INTERPRETER_LIGHT_SERVER_PORT', '8000')
base_url = f"http://{interpreter_server_host}:{interpreter_server_port}/openai"
open_interpreter = openai.LLM(
model="open-interpreter", base_url="http://0.0.0.0:8000/openai"
model="open-interpreter", base_url=base_url
)
assistant = VoiceAssistant(
vad=silero.VAD.load(), # Voice Activity Detection
stt=deepgram.STT(), # Speech-to-Text
@ -51,13 +56,8 @@ async def entrypoint(ctx: JobContext):
@chat.on("message_received")
def on_chat_received(msg: rtc.ChatMessage):
print("RECEIVED MESSAGE OMG!!!!!!!!!!")
print("RECEIVED MESSAGE OMG!!!!!!!!!!")
print("RECEIVED MESSAGE OMG!!!!!!!!!!")
print("RECEIVED MESSAGE OMG!!!!!!!!!!")
if not msg.message:
return
asyncio.create_task(_answer_from_text(msg.message))
# Start the voice assistant with the LiveKit room
@ -72,5 +72,5 @@ async def entrypoint(ctx: JobContext):
if __name__ == "__main__":
# Initialize the worker with the entrypoint
cli.run_app(
WorkerOptions(entrypoint_fnc=entrypoint, api_key="devkey", api_secret="secret", port=8082)
WorkerOptions(entrypoint_fnc=entrypoint, api_key="devkey", api_secret="secret", ws_url=os.getenv("LIVEKIT_URL"))
)

Loading…
Cancel
Save