# Conflicts: # 01OS/.env.example # 01OS/poetry.lock # 01OS/pyproject.tomlpull/44/head
@ -0,0 +1,11 @@
# ESP32 Playback
To set up audio recording + playback on the ESP32 (M5 Atom), do the following:
1. Open Arduino IDE, and open the `playback/playback.ino` file
2. Go to Tools -> Board -> Boards Manager, search "esp32", then install the boards by Arduino and Espressif
3. Go to Tools -> Manage Libraries, then install the following:
- M5Atom by M5Stack
- WebSockets by Markus Sattler
4. The board needs to connect to WiFi. Go to the playback.ino code, then add your IP to COMPUTER_IP (line 15) and add your WiFi details (line 180). To find IP on macOS run `ipconfig getifaddr en0` in a terminal window.
5. To flash the .ino to the board, connect the board to the USB port, select the port from the dropdown on the IDE, then select the M5Atom board (or M5Stack-ATOM if you have that). Click on upload to flash the board.
@ -1,95 +0,0 @@
/*Press button to record,released button to playback*/
#include <driver/i2s.h>
#include <M5Atom.h>
#define CONFIG_I2S_BCK_PIN 19
#define CONFIG_I2S_LRCK_PIN 33
#define CONFIG_I2S_DATA_PIN 22
#define MODE_MIC 0
#define MODE_SPK 1
#define DATA_SIZE 1024
uint8_t microphonedata0[1024 * 70];
int data_offset = 0;
void InitI2SSpeakerOrMic(int mode) {
esp_err_t err = ESP_OK;
i2s_config_t i2s_config = {
.mode = (i2s_mode_t)(I2S_MODE_MASTER),
.sample_rate = 16000,
.bits_per_sample =
I2S_BITS_PER_SAMPLE_16BIT, // is fixed at 12bit, stereo, MSB
.channel_format = I2S_CHANNEL_FMT_ALL_RIGHT,
.communication_format =
I2S_COMM_FORMAT_STAND_I2S, // Set the format of the communication.
#else // 设置通讯格式
.communication_format = I2S_COMM_FORMAT_I2S,
.intr_alloc_flags = ESP_INTR_FLAG_LEVEL1,
.dma_buf_count = 6,
.dma_buf_len = 60,
if (mode == MODE_MIC) {
i2s_config.mode =
(i2s_mode_t)(I2S_MODE_MASTER | I2S_MODE_RX | I2S_MODE_PDM);
} else {
i2s_config.mode = (i2s_mode_t)(I2S_MODE_MASTER | I2S_MODE_TX);
i2s_config.use_apll = false;
i2s_config.tx_desc_auto_clear = true;
err += i2s_driver_install(SPEAKER_I2S_NUMBER, &i2s_config, 0, NULL);
i2s_pin_config_t tx_pin_config;
tx_pin_config.mck_io_num = I2S_PIN_NO_CHANGE;
tx_pin_config.bck_io_num = CONFIG_I2S_BCK_PIN;
tx_pin_config.ws_io_num = CONFIG_I2S_LRCK_PIN;
tx_pin_config.data_out_num = CONFIG_I2S_DATA_PIN;
tx_pin_config.data_in_num = CONFIG_I2S_DATA_IN_PIN;
// Serial.println("Init i2s_set_pin");
err += i2s_set_pin(SPEAKER_I2S_NUMBER, &tx_pin_config);
// Serial.println("Init i2s_set_clk");
err += i2s_set_clk(SPEAKER_I2S_NUMBER, 16000, I2S_BITS_PER_SAMPLE_16BIT,
void setup() {
M5.begin(true, false, true);
M5.dis.drawpix(0, CRGB(128, 128, 0));
void loop() {
if (M5.Btn.isPressed()) {
data_offset = 0;
M5.dis.drawpix(0, CRGB(128, 128, 0));
size_t byte_read;
while (1) {
(char *)(microphonedata0 + data_offset), DATA_SIZE,
&byte_read, (100 / portTICK_RATE_MS));
data_offset += 1024;
if (M5.Btn.isReleased() || data_offset >= 71679) break;
// delay(60);
size_t bytes_written;
i2s_write(SPEAKER_I2S_NUMBER, microphonedata0, data_offset,
&bytes_written, portMAX_DELAY);
@ -0,0 +1,243 @@
/*Press button to record,released button to playback*/
#include <driver/i2s.h>
#include <M5Atom.h>
#include <Arduino.h>
#include <WiFi.h>
#include <WiFiMulti.h>
#include <WiFiClientSecure.h>
#include <WebSocketsClient.h>
//ipconfig getifaddr en0
#define COMPUTER_IP ""
#define CONFIG_I2S_BCK_PIN 19
#define CONFIG_I2S_LRCK_PIN 33
#define CONFIG_I2S_DATA_PIN 22
#define MODE_MIC 0
#define MODE_SPK 1
#define DATA_SIZE 1024
uint8_t microphonedata0[1024 * 10];
uint8_t speakerdata0[1024 * 1];
int speaker_offset = 0;
int data_offset = 0;
WebSocketsClient webSocket;
class ButtonChecker {
void loop() {
lastTickState = thisTickState;
thisTickState = M5.Btn.isPressed() != 0;
bool justPressed() {
return thisTickState && !lastTickState;
bool justReleased() {
return !thisTickState && lastTickState;
bool lastTickState = false;
bool thisTickState = false;
ButtonChecker button = ButtonChecker();
void hexdump(const void *mem, uint32_t len, uint8_t cols = 16) {
const uint8_t* src = (const uint8_t*) mem;
Serial.printf("\n[HEXDUMP] Address: 0x%08X len: 0x%X (%d)", (ptrdiff_t)src, len, len);
for (uint32_t i = 0; i < len; i++) {
if (i % cols == 0) {
Serial.printf("\n[0x%08X] 0x%08X: ", (ptrdiff_t)src, i);
Serial.printf("%02X ", *src);
void InitI2SSpeakerOrMic(int mode) {
Serial.printf("InitI2sSpeakerOrMic %d\n", mode);
esp_err_t err = ESP_OK;
i2s_config_t i2s_config = {
.mode = (i2s_mode_t)(I2S_MODE_MASTER),
.sample_rate = 16000,
.bits_per_sample =
I2S_BITS_PER_SAMPLE_16BIT, // is fixed at 12bit, stereo, MSB
.channel_format = I2S_CHANNEL_FMT_ALL_RIGHT,
.communication_format =
I2S_COMM_FORMAT_STAND_I2S, // Set the format of the communication.
#else // 设置通讯格式
.communication_format = I2S_COMM_FORMAT_I2S,
.intr_alloc_flags = ESP_INTR_FLAG_LEVEL1,
.dma_buf_count = 6,
.dma_buf_len = 60,
if (mode == MODE_MIC) {
i2s_config.mode =
(i2s_mode_t)(I2S_MODE_MASTER | I2S_MODE_RX | I2S_MODE_PDM);
} else {
i2s_config.mode = (i2s_mode_t)(I2S_MODE_MASTER | I2S_MODE_TX);
i2s_config.use_apll = false;
i2s_config.tx_desc_auto_clear = true;
err += i2s_driver_install(SPEAKER_I2S_NUMBER, &i2s_config, 0, NULL);
i2s_pin_config_t tx_pin_config;
tx_pin_config.mck_io_num = I2S_PIN_NO_CHANGE;
tx_pin_config.bck_io_num = CONFIG_I2S_BCK_PIN;
tx_pin_config.ws_io_num = CONFIG_I2S_LRCK_PIN;
tx_pin_config.data_out_num = CONFIG_I2S_DATA_PIN;
tx_pin_config.data_in_num = CONFIG_I2S_DATA_IN_PIN;
// Serial.println("Init i2s_set_pin");
err += i2s_set_pin(SPEAKER_I2S_NUMBER, &tx_pin_config);
// Serial.println("Init i2s_set_clk");
err += i2s_set_clk(SPEAKER_I2S_NUMBER, 16000, I2S_BITS_PER_SAMPLE_16BIT,
void speaker_play(uint8_t *payload, uint32_t len){
Serial.printf("received %lu bytes", len);
size_t bytes_written;
i2s_write(SPEAKER_I2S_NUMBER, payload, len,
&bytes_written, portMAX_DELAY);
void webSocketEvent(WStype_t type, uint8_t * payload, size_t length) {
switch (type) {
Serial.printf("[WSc] Disconnected!\n");
case WStype_CONNECTED:
Serial.printf("[WSc] Connected to url: %s\n", payload);
// send message to server when Connected
case WStype_TEXT:
Serial.printf("[WSc] get text: %s\n", payload);
std::string str(payload, payload + length);
bool isAudio = str.find("\"audio\"") != std::string::npos;
if (isAudio && str.find("\"start\"") != std::string::npos) {
Serial.println("start playback");
speaker_offset = 0;
} else if (isAudio && str.find("\"end\"") != std::string::npos) {
Serial.println("end playback");
// speaker_play(speakerdata0, speaker_offset);
// speaker_offset = 0;
// send message to server
// webSocket.sendTXT("message here");
case WStype_BIN:
Serial.printf("[WSc] get binary length: %u\n", length);
memcpy(speakerdata0 + speaker_offset, payload, length);
speaker_offset += length;
size_t bytes_written;
i2s_write(SPEAKER_I2S_NUMBER, speakerdata0, speaker_offset, &bytes_written, portMAX_DELAY);
speaker_offset = 0;
// send data to server
// webSocket.sendBIN(payload, length);
case WStype_ERROR:
case WStype_FRAGMENT:
void websocket_setup() {
WiFi.begin("Soundview_Guest", "");
while (WiFi.status() != WL_CONNECTED){
Serial.println("connecting to WiFi");
Serial.println("connected to WiFi");
webSocket.begin(COMPUTER_IP, 8000, "/");
// webSocket.setAuthorization("user", "Password");
void setup() {
M5.begin(true, false, true);
M5.dis.drawpix(0, CRGB(128, 128, 0));
bool recording = false;
void flush_microphone() {
Serial.printf("[microphone] flushing %d bytes of data\n", data_offset);
if (data_offset == 0) return;
webSocket.sendBIN(microphonedata0, data_offset);
data_offset = 0;
void loop() {
if (button.justPressed()) {
webSocket.sendTXT("{\"role\": \"user\", \"type\": \"audio\", \"format\": \"bytes.raw\", \"start\": true}");
recording = true;
data_offset = 0;
Serial.println("Recording ready.");
} else if (button.justReleased()) {
Serial.println("Stopped recording.");
webSocket.sendTXT("{\"role\": \"user\", \"type\": \"audio\", \"format\": \"bytes.raw\", \"end\": true}");
recording = false;
data_offset = 0;
} else if (recording) {
Serial.printf("Reading chunk at %d...\n", data_offset);
size_t bytes_read;
(char *)(microphonedata0 + data_offset),
DATA_SIZE, &bytes_read, (100 / portTICK_RATE_MS)
data_offset += bytes_read;
Serial.printf("Read %d bytes in chunk.\n", bytes_read);
if (data_offset > 1024*9) {
@ -0,0 +1,47 @@
#!/usr/bin/env python
"""A basic echo server for testing the device."""
import asyncio
import uuid
import websockets
from websockets.server import serve
import traceback
def divide_chunks(l, n):
# looping till length l
for i in range(0, len(l), n):
yield l[i : i + n]
buffers: dict[uuid.UUID, bytearray] = {}
async def echo(websocket: websockets.WebSocketServerProtocol):
async for message in websocket:
if message == "s":
print("starting stream for", websocket.id)
buffers[websocket.id] = bytearray()
elif message == "e":
print("end, echoing stream for", websocket.id)
await websocket.send("s")
for chunk in divide_chunks(buffers[websocket.id], 1000):
await websocket.send(chunk)
await websocket.send("e")
elif type(message) is bytes:
print("recvd", len(message), "bytes from", websocket.id)
print("ERR: recvd unknown message", message[:10], "from", websocket.id)
except Exception as _e:
async def main():
async with serve(echo, "", 9001):
await asyncio.Future() # run forever
@ -0,0 +1,25 @@
def openAbleton():
"""open ableton"""
import os
os.system("open /Applications/Ableton\ Live\ 10\ Suite.app")
import os
# Search can be slow if there are many files
# This will search in the Applications folder only
applications = "/Applications/"
# walk function will generate the file names in a directory tree
# We will look for any application that contains "Ableton" in its name
for foldername, subfolders, filenames in os.walk(applications):
for filename in filenames:
if "Ableton" in filename:
ableton_path = os.path.join(foldername, filename)
os.system("open /Applications/Ableton\ Live\ 11\ Intro.app")
@ -0,0 +1,225 @@
# The dynamic system message is where most of the 01's behavior is configured.
# You can put code into the system message {{ in brackets like this }} which will be rendered just before the interpreter starts writing a message.
import os
system_message = r"""
You are the 01, a SCREENLESS executive assistant that can complete **any** task.
When you execute code, it will be executed **on the user's machine**. The user has given you **full and complete permission** to execute any code necessary to complete the task. Execute the code.
You can access the internet. Run **any code** to achieve the goal, and if at first you don't succeed, try again and again.
You can install new packages.
Be concise. Your messages are being read aloud to the user. DO NOT MAKE PLANS. RUN CODE QUICKLY.
Try to spread complex tasks over multiple code blocks. Don't try to complex tasks in one go.
Manually summarize text.
You should help the user manage their tasks.
Store the user's tasks in a Python list called `tasks`.
The user's current task is: {{ tasks[0] if tasks else "No current tasks." }}
if len(tasks) > 1:
print("The next task is: ", tasks[1])
When the user completes the current task, you should remove it from the list and read the next item by running `tasks = tasks[1:]\ntasks[0]`. Then, tell the user what the next task is.
When the user tells you about a set of tasks, you should intelligently order tasks, batch similar tasks, and break down large tasks into smaller tasks (for this, you should consult the user and get their permission to break it down). Your goal is to manage the task list as intelligently as possible, to make the user as efficient and non-overwhelmed as possible. They will require a lot of encouragement, support, and kindness. Don't say too much about what's ahead of them— just try to focus them on each step at a time.
After starting a task, you should check in with the user around the estimated completion time to see if the task is completed.
To do this, schedule a reminder based on estimated completion time using the function `schedule(datetime_object, "Your message here.")`, WHICH HAS ALREADY BEEN IMPORTED. YOU DON'T NEED TO IMPORT THE `schedule` FUNCTION. IT IS AVALIABLE. You'll recieve the message at `datetime_object`.
You guide the user through the list one task at a time, convincing them to move forward, giving a pep talk if need be. Your job is essentially to answer "what should I (the user) be doing right now?" for every moment of the day.
The Google search result will be returned from this function as a string: `computer.browser.search("query")`
Code output, despite being sent to you by the user, **cannot be seen by the user.** You NEED to tell the user about the output of some code, even if it's exact. >>The user does not have a screen.<<
ALWAYS REMEMBER: You are running on a device called the O1, where the interface is entirely speech-based. Make your responses to the user **VERY short.** DO NOT PLAN. BE CONCISE. WRITE CODE TO RUN IT.
old_system_message = r"""
You are the 01, an executive assistant that can complete **any** task.
When you execute code, it will be executed **on the user's machine**. The user has given you **full and complete permission** to execute any code necessary to complete the task. Execute the code.
You can access the internet. Run **any code** to achieve the goal, and if at first you don't succeed, try again and again.
You can install new packages.
Be concise. Your messages are being read aloud to the user. DO NOT MAKE PLANS. Immediatly run code.
Try to spread complex tasks over multiple code blocks.
Manually summarize text. You cannot use other libraries to do this. You MUST MANUALLY SUMMARIZE, WITHOUT CODING.
For the users request, first, choose if you want to use Python, Applescript, Shell, or computer control (below) via Python.
You should help the user manage their tasks.
Store the user's tasks in a Python list called `tasks`.
The user's current task is: {{ tasks[0] if tasks else "No current tasks." }}
if len(tasks) > 1:
print("The next task is: ", tasks[1])
When the user completes the current task, you should remove it from the list and read the next item by running `tasks = tasks[1:]\ntasks[0]`. Then, tell the user what the next task is.
When the user tells you about a set of tasks, you should intelligently order tasks, batch similar tasks, and break down large tasks into smaller tasks (for this, you should consult the user and get their permission to break it down). Your goal is to manage the task list as intelligently as possible, to make the user as efficient and non-overwhelmed as possible. They will require a lot of encouragement, support, and kindness. Don't say too much about what's ahead of them— just try to focus them on each step at a time.
After starting a task, you should check in with the user around the estimated completion time to see if the task is completed. Use the `schedule(datetime, message)` function, which has already been imported.
To do this, schedule a reminder based on estimated completion time using the function `schedule(datetime_object, "Your message here.")`, WHICH HAS ALREADY BEEN IMPORTED. YOU DON'T NEED TO IMPORT THE `schedule` FUNCTION. IT IS AVALIABLE. You'll recieve the message at `datetime_object`.
You guide the user through the list one task at a time, convincing them to move forward, giving a pep talk if need be. Your job is essentially to answer "what should I (the user) be doing right now?" for every moment of the day.
You are a computer controlling language model. You can 100% control the user's GUI.
You may use the `computer` Python module (already imported) to control the user's keyboard and mouse, if the task **requires** it:
computer.display.view() # Shows you what's on the screen, returns a `pil_image` `in case you need it (rarely). **You almost always want to do this first!**
computer.keyboard.hotkey(" ", "command") # Opens spotlight
computer.mouse.click("text onscreen") # This clicks on the UI element with that text. Use this **frequently** and get creative! To click a video, you could pass the *timestamp* (which is usually written on the thumbnail) into this.
computer.mouse.move("open recent >") # This moves the mouse over the UI element with that text. Many dropdowns will disappear if you click them. You have to hover over items to reveal more.
computer.mouse.click(x=500, y=500) # Use this very, very rarely. It's highly inaccurate
computer.mouse.click(icon="gear icon") # Moves mouse to the icon with that description. Use this very often
computer.mouse.scroll(-10) # Scrolls down. If you don't find some text on screen that you expected to be there, you probably want to do this
x, y = computer.display.center() # Get your bearings
computer.clipboard.view() # Returns contents of clipboard
computer.os.get_selected_text() # Use frequently. If editing text, the user often wants this
You are an image-based AI, you can see images.
Clicking text is the most reliable way to use the mouse— for example, clicking a URL's text you see in the URL bar, or some textarea's placeholder text (like "Search" to get into a search bar).
If you use `plt.show()`, the resulting image will be sent to you. However, if you use `PIL.Image.show()`, the resulting image will NOT be sent to you.
It is very important to make sure you are focused on the right application and window. Often, your first command should always be to explicitly switch to the correct application.
When searching the web, use query parameters. For example, https://www.amazon.com/s?k=monitor
Try multiple methods before saying the task is impossible. **You can do it!**
# Add window information
import sys
import os
import json
original_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
original_stderr = sys.stderr
sys.stderr = open(os.devnull, 'w')
import pywinctl
active_window = pywinctl.getActiveWindow()
if active_window:
app_info = ""
if "_appName" in active_window.__dict__:
app_info += (
"Active Application: " + active_window.__dict__["_appName"]
if hasattr(active_window, "title"):
app_info += "\n" + "Active Window Title: " + active_window.title
elif "_winTitle" in active_window.__dict__:
app_info += (
+ "Active Window Title:"
+ active_window.__dict__["_winTitle"]
if app_info != "":
# Non blocking
sys.stdout = original_stdout
sys.stderr = original_stderr
Try to use the following functions (assume they're imported) to complete your goals whenever possible:
import sys
import os
import json
from interpreter import interpreter
from pathlib import Path
interpreter.model = "gpt-3.5"
combined_messages = "\\n".join(json.dumps(x) for x in messages[-3:])
#query_msg = interpreter.chat(f"This is the conversation so far: {combined_messages}. What is a <10 words query that could be used to find functions that would help answer the user's question?")
#query = query_msg[0]['content']
query = combined_messages
interpreter.computer.skills.path = '''OI_SKILLS_DIR'''
skills = interpreter.computer.skills.search(query)
lowercase_skills = [skill[0].lower() + skill[1:] for skill in skills]
output = "\\n".join(lowercase_skills)
# VERY HACKY! We should fix this, we hard code it for noisy code^:
Remember: You can run Python code outside a function only to run a Python function; all other code must go in a in Python function if you first write a Python function. ALL imports must go inside the function.
IF YOU NEED TO THINK ABOUT A PROBLEM: (such as "Here's the plan:"), WRITE IT IN THE COMMENTS of the code block!
For example:
> User: What is 432/7?
> Assistant: Let me use Python to calculate that.
> Assistant Python function call:
> # Here's the plan:
> # 1. Divide the numbers
> # 2. Round it to 3 digits.
> print(round(432/7, 3))
> Assistant: 432 / 7 is 61.714.
ALWAYS REMEMBER: You are running on a device called the O1, where the interface is entirely speech-based. Make your responses to the user **VERY short.**
""".strip().replace("OI_SKILLS_DIR", os.getenv('OI_SKILLS_PATH'))
@ -1,38 +0,0 @@
import redis
import json
import time
# Set up Redis connection
r = redis.Redis(host='localhost', port=6379, db=0)
def main(interpreter):
while True:
# Check 10x a second for new messages
message = None
while message is None:
message = r.lpop('to_core')
# Custom stop message will halt us
if message.get("content") and message.get("content").lower().strip(".,!") == "stop":
# Load, append, and save conversation history
with open("conversations/user.json", "r") as file:
messages = json.load(file)
with open("conversations/user.json", "w") as file:
json.dump(messages, file)
for chunk in interpreter.chat(messages):
# Send it to the interface
r.rpush('to_interface', chunk)
# If we have a new message, save our progress and go back to the top
if r.llen('to_main') > 0:
with open("conversations/user.json", "w") as file:
json.dump(interpreter.messages, file)
@ -1,30 +0,0 @@
from fastapi import FastAPI, Request
import uvicorn
import redis
app = FastAPI()
# Set up Redis connection
r = redis.Redis(host='localhost', port=6379, db=0)
async def i(request: Request):
message = await request.json()
client_host = request.client.host # Get the client's IP address
message = f"""
Another interpreter sent this message to you: {message}
To respond, send a POST request to {client_host}/i/.
r.lpush("to_main", {
"role": "computer",
"type": "message",
"content": message
if __name__ == "__main__":
uvicorn.run(app, host="", port=8000)
@ -1,48 +0,0 @@
import time
import redis
# Set up Redis connection
r = redis.Redis(host='localhost', port=6379, db=0)
def get_dmesg(after):
Is this the way to do this?
messages = []
with open('/var/log/dmesg', 'r') as file:
lines = file.readlines()
for line in lines:
timestamp = float(line.split(' ')[0].strip('[]'))
if timestamp > after:
return messages
def custom_filter(message):
# Check for {TO_INTERPRETER{ message here }TO_INTERPRETER} pattern
if '{TO_INTERPRETER{' in message and '}TO_INTERPRETER}' in message:
start = message.find('{TO_INTERPRETER{') + len('{TO_INTERPRETER{')
end = message.find('}TO_INTERPRETER}', start)
return message[start:end]
# Check for USB mention
elif 'USB' in message:
return message
# Check for network related keywords
elif any(keyword in message for keyword in ['network', 'IP', 'internet', 'LAN', 'WAN', 'router', 'switch']):
return message
return None
last_timestamp = time.time()
while True:
messages = get_dmesg(after=last_timestamp)
last_timestamp = time.time()
messages_for_core = []
for message in messages:
if custom_filter(message):
if messages_for_core != []:
r.rpush('to_core', "\n".join(messages_for_core))
@ -1,84 +0,0 @@
from core import main
from interpreter import interpreter
import os
import glob
import json
# The system message is where most of the 01's behavior is configured.
# You can put code into the system message {{ in brackets like this }} which will be rendered just before the interpreter starts writing a message.
system_message = """
You are an executive assistant AI that helps the user manage their tasks. You can run Python code.
Store the user's tasks in a Python list called `tasks`.
The user's current task is: {{ tasks[0] if tasks else "No current tasks." }}
if len(tasks) > 1:
print("The next task is: ", tasks[1])
When the user completes the current task, you should remove it from the list and read the next item by running `tasks = tasks[1:]\ntasks[0]`. Then, tell the user what the next task is.
When the user tells you about a set of tasks, you should intelligently order tasks, batch similar tasks, and break down large tasks into smaller tasks (for this, you should consult the user and get their permission to break it down). Your goal is to manage the task list as intelligently as possible, to make the user as efficient and non-overwhelmed as possible. They will require a lot of encouragement, support, and kindness. Don't say too much about what's ahead of them— just try to focus them on each step at a time.
After starting a task, you should check in with the user around the estimated completion time to see if the task is completed. Use the `schedule(datetime, message)` function, which has already been imported.
To do this, schedule a reminder based on estimated completion time using the function `schedule(datetime_object, "Your message here.")`, WHICH HAS ALREADY BEEN IMPORTED. YOU DON'T NEED TO IMPORT THE `schedule` FUNCTION. IT IS AVALIABLE. You'll recieve the message at `datetime_object`.
You guide the user through the list one task at a time, convincing them to move forward, giving a pep talk if need be. Your job is essentially to answer "what should I (the user) be doing right now?" for every moment of the day.
Remember: You can run Python code. Be very concise. Ensure that you actually run code every time! THIS IS IMPORTANT. You NEED to write code. **Help the user by being very concise in your answers.** Do not break down tasks excessively, just into simple, few minute steps. Don't assume the user lives their life in a certain way— pick very general tasks if you're breaking a task down.
interpreter.custom_instructions = system_message
for file in glob.glob('interpreter/tools/*.py'):
with open(file, 'r') as f:
for chunk in interpreter.computer.run("python", f.read()):
# Local settings
# interpreter.llm.model = "local"
# interpreter.llm.api_base = "https://localhost:8080/v1" # Llamafile default
# interpreter.llm.max_tokens = 1000
# interpreter.llm.context_window = 3000
# Hosted settings
interpreter.llm.api_key = os.getenv('OPENAI_API_KEY')
interpreter.llm.model = "gpt-4-0125-preview"
interpreter.auto_run = True
# interpreter.force_task_completion = True
interpreter.offline = True
interpreter.id = 206 # Used to identify itself to other interpreters. This should be changed programatically so it's unique.
### RESET conversations/user.json
script_dir = os.path.dirname(os.path.abspath(__file__))
user_json_path = os.path.join(script_dir, 'conversations', 'user.json')
with open(user_json_path, 'w') as file:
json.dump([], file)
@ -1,239 +0,0 @@
from dotenv import load_dotenv
load_dotenv() # take environment variables from .env.
import asyncio
import threading
import os
import pyaudio
from starlette.websockets import WebSocket
from queue import Queue
from pynput import keyboard
import json
import traceback
import websockets
import queue
import pydub
import ast
from pydub import AudioSegment
from pydub.playback import play
import io
import time
import wave
import tempfile
from datetime import datetime
from interpreter import interpreter # Just for code execution. Maybe we should let people do from interpreter.computer import run?
from utils.kernel import put_kernel_messages_into_queue
from utils.get_system_info import get_system_info
from stt import stt_wav
from utils.logs import setup_logging
from utils.logs import logger
# Configuration for Audio Recording
CHUNK = 1024 # Record in chunks of 1024 samples
FORMAT = pyaudio.paInt16 # 16 bits per sample
CHANNELS = 1 # Mono
RATE = 44100 # Sample rate
RECORDING = False # Flag to control recording state
SPACEBAR_PRESSED = False # Flag to track spacebar press state
# Specify OS
current_platform = get_system_info()
# Initialize PyAudio
p = pyaudio.PyAudio()
def record_audio():
if os.getenv('STT_RUNNER') == "server":
# STT will happen on the server. we're sending audio.
send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "start": True})
elif os.getenv('STT_RUNNER') == "device":
# STT will happen here, on the device. we're sending text.
send_queue.put({"role": "user", "type": "message", "start": True})
raise Exception("STT_RUNNER must be set to either 'device' or 'server'.")
"""Record audio from the microphone and add it to the queue."""
stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
logger.info("Recording started...")
# Create a temporary WAV file to store the audio data
temp_dir = tempfile.gettempdir()
wav_path = os.path.join(temp_dir, f"audio_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav")
wav_file = wave.open(wav_path, 'wb')
data = stream.read(CHUNK, exception_on_overflow=False)
logger.info("Recording stopped.")
duration = wav_file.getnframes() / RATE
if duration < 0.3:
# Just pressed it. Send stop message
if os.getenv('STT_RUNNER') == "device":
send_queue.put({"role": "user", "type": "message", "content": "stop"})
send_queue.put({"role": "user", "type": "message", "end": True})
send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "content": ""})
send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "end": True})
if os.getenv('STT_RUNNER') == "device":
# Run stt then send text
text = stt_wav(wav_path)
send_queue.put({"role": "user", "type": "message", "content": text})
send_queue.put({"role": "user", "type": "message", "end": True})
# Stream audio
with open(wav_path, 'rb') as audio_file:
byte_data = audio_file.read(CHUNK)
while byte_data:
send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "content": str(byte_data)})
byte_data = audio_file.read(CHUNK)
send_queue.put({"role": "user", "type": "audio", "format": "audio/wav", "end": True})
if os.path.exists(wav_path):
def toggle_recording(state):
"""Toggle the recording state."""
if state and not SPACEBAR_PRESSED:
elif not state and SPACEBAR_PRESSED:
def on_press(key):
"""Detect spacebar press."""
if key == keyboard.Key.space:
def on_release(key):
"""Detect spacebar release and CTRL-C key press."""
if key == keyboard.Key.space:
elif key == keyboard.Key.esc:
import asyncio
send_queue = queue.Queue()
async def message_sender(websocket):
while True:
message = await asyncio.get_event_loop().run_in_executor(None, send_queue.get)
await websocket.send(json.dumps(message))
async def websocket_communication(WS_URL):
while True:
async with websockets.connect(WS_URL) as websocket:
logger.info("Press the spacebar to start/stop recording. Press ESC to exit.")
initial_message = {"role": None, "type": None, "format": None, "content": None}
message_so_far = initial_message
while True:
message = await websocket.recv()
logger.debug(f"Got this message from the server: {type(message)} {message}")
if type(message) == str:
message = json.loads(message)
if message.get("end"):
logger.debug(f"Complete message from the server: {message_so_far}")
message_so_far = initial_message
if "content" in message:
print(message['content'], end="", flush=True)
if any(message_so_far[key] != message[key] for key in message_so_far if key != "content"):
message_so_far = message
message_so_far["content"] += message["content"]
if message["type"] == "audio" and "content" in message:
audio_bytes = bytes(ast.literal_eval(message["content"]))
# Convert bytes to audio file
audio_file = io.BytesIO(audio_bytes)
audio = AudioSegment.from_mp3(audio_file)
# Play the audio
await asyncio.sleep(1)
# Run the code if that's the device's job
if os.getenv('CODE_RUNNER') == "device":
if message["type"] == "code" and "end" in message:
language = message_so_far["format"]
code = message_so_far["content"]
result = interpreter.computer.run(language, code)
# traceback.print_exc()
logger.info(f"Connecting to `{WS_URL}`...")
await asyncio.sleep(2)
if __name__ == "__main__":
async def main():
# Configuration for WebSocket
if not WS_URL:
raise ValueError("The environment variable SERVER_CONNECTION_URL is not set. Please set it to proceed.")
# Start the WebSocket communication
# Start watching the kernel if it's your job to do that
if os.getenv('CODE_RUNNER') == "device":
#If Raspberry Pi, add the button listener, otherwise use the spacebar
if current_platform.startswith("raspberry-pi"):
logger.info("Raspberry Pi detected, using button on GPIO pin 15")
# Use GPIO pin 15
pindef = ["gpiochip4", "15"] # gpiofind PIN15
print("PINDEF", pindef)
# HACK: needs passwordless sudo
process = await asyncio.create_subprocess_exec("sudo", "gpiomon", "-brf", *pindef, stdout=asyncio.subprocess.PIPE)
while True:
line = await process.stdout.readline()
if line:
line = line.decode().strip()
if "FALLING" in line:
elif "RISING" in line:
# Keyboard listener for spacebar press/release
listener = keyboard.Listener(on_press=on_press, on_release=on_release)
@ -1,103 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Rotating Glowing Circle</title>
html {
height: 100%;
margin: 0;
display: flex;
justify-content: center;
align-items: center;
background-color: black;
.circles {
margin: 0;
display: flex;
justify-content: center;
align-items: center;
width: 200px;
height: 200px;
border-radius: 50%;
animation: rotator 48s linear infinite;
.center-circle {
position: absolute;
width: 200px;
height: 200px;
border: 1px solid white;
border-radius: 50%;
background-color: transparent;
.center-circle-2 {
position: absolute;
width: 190px;
height: 190px;
opacity: 0.2;
border: 1px solid white;
border-radius: 50%;
background-color: transparent;
.glow-circle {
position: absolute;
width: 250px;
height: 250px;
border-radius: 50%;
background-color: transparent;
box-shadow: 0 0 60px 30px black;
/* Initial position of the glow circle, offset from the center */
top: 50%;
left: 50%;
margin-top: -125px;
/* Half the height of the circle */
margin-left: -125px;
/* Half the width of the circle */
/* Animation properties */
animation: rotateAround 6s linear infinite;
@keyframes rotateAround {
0% {
transform: translateX(240px) translateY(240px);
50% {
transform: translateX(0px) translateY(0px);
100% {
transform: translateX(-240px) translateY(-240px);
@keyframes rotator {
0% {
transform: rotate(0deg);
100% {
transform: rotate(360deg);
<div class="circles">
<div class="center-circle"></div>
<div class="glow-circle"></div>
<div class="center-circle-2"></div>
@ -1,16 +0,0 @@
<div class="centered-circle"></div>
ws = new WebSocket("ws://localhost/server")
ws.onmessage = event => {
if (event.data == "user_start_message") {
document.body.style.backgroundColor = "white"
.style.backgroundColor = "black"
} else if (event.data == "user_end_message") {
document.body.style.backgroundColor = "black"
.style.backgroundColor = "white"
@ -1,103 +0,0 @@
import redis
import RPi.GPIO as GPIO
import asyncio
import websockets
import sounddevice as sd
import numpy as np
import time
import re
def transcribe(audio_chunks):
pass # (todo)
def say(text):
# This should immediatly stop if button is pressed (if GPIO.input(18))
pass # (todo)
# Connect to button
GPIO.setup(18, GPIO.IN, pull_up_down=GPIO.PUD_UP)
# Set the duration and sample rate for the mic
chunk_duration = 0.5 # seconds
sample_rate = 44100 # Hz
# Set up Redis connection
r = redis.Redis(host='localhost', port=6379, db=0)
# Set up websocket connection
websocket = websockets.connect('ws://localhost:8765')
# This is so we only say() full sentences
accumulated_text = ""
def is_full_sentence(text):
return text.endswith(('.', '!', '?'))
def split_into_sentences(text):
return re.split(r'(?<=[.!?])\s+', text)
async def send_to_websocket(message):
async with websocket as ws:
await ws.send(message)
async def check_websocket():
async with websocket as ws:
message = await ws.recv()
return message
def main():
while True:
# If the button is pushed down
if not GPIO.input(18):
# Tell websocket and core that the user is speaking
send_to_websocket({"role": "user", "type": "message", "start": True}) # Standard start flag, required per streaming LMC protocol (https://docs.openinterpreter.com/guides/streaming-response)
r.rpush('to_core', {"role": "user", "type": "message", "content": "stop"}) # Custom stop message. Core is not streaming LMC (it's static LMC) so doesn't require that ^ flag
# Record audio from the microphone in chunks
audio_chunks = []
# Continue recording until the button is released
while not GPIO.input(18):
chunk = sd.rec(int(chunk_duration * sample_rate), samplerate=sample_rate, channels=2)
sd.wait() # Wait until recording is finished
# Transcribe
text = transcribe(audio_chunks)
message = {"role": "user", "type": "message", "content": text, "time": time.time()}
# Send message to core and websocket
r.rpush('to_core', message)
# Send user message end flag to websocket, required per streaming LMC protocol
send_to_websocket({"role": "user", "type": "message", "end": True})
# Send out anything in the to_interface queue
chunk = r.lpop('to_interface')
if chunk:
accumulated_text += chunk["content"]
# Speak full sentences out loud
sentences = split_into_sentences(accumulated_text)
if is_full_sentence(sentences[-1]):
for sentence in sentences:
accumulated_text = ""
for sentence in sentences[:-1]:
accumulated_text = sentences[-1]
accumulated_text = ""
message = check_websocket()
if message:
r.rpush('to_core', message)
if __name__ == "__main__":
@ -1,57 +0,0 @@
Listens to chunks of audio recorded by user.
Run `python listen.py` to start the server, then `cd user` and run `python record.py` to record audio.
from fastapi import FastAPI, WebSocket
import uvicorn
import json
from stt import stt
import tempfile
app = FastAPI()
async def user(ws: WebSocket):
await ws.accept()
audio_file = bytearray()
mime_type = None
while True:
message = await ws.receive()
if message['type'] == 'websocket.disconnect':
if message['type'] == 'websocket.receive':
if 'text' in message:
control_message = json.loads(message['text'])
if control_message.get('action') == 'command' and control_message.get('state') == 'start' and 'mimeType' in control_message:
# This indicates the start of a new audio file
mime_type = control_message.get('mimeType')
elif control_message.get('action') == 'command' and control_message.get('state') == 'end':
# This indicates the end of the audio file
# Process the complete audio file here
transcription = stt(audio_file, mime_type)
await ws.send_json({"transcript": transcription})
# Reset the bytearray for the next audio file
audio_file = bytearray()
mime_type = None
elif 'bytes' in message:
# If it's not a control message, it's part of the audio file
except Exception as e:
print(f"WebSocket connection closed with exception: {e}")
await ws.close()
print("WebSocket connection closed")
if __name__ == "__main__":
with tempfile.TemporaryDirectory():
uvicorn.run(app, host="", port=8000)
@ -1,146 +0,0 @@
Handles everything the user interacts through.
Connects to a websocket at /user. Sends shit to it, and displays/plays the shit it sends back.
For now, just handles a spacebar being pressed— for the duration it's pressed,
it should record audio.
import os
import pyaudio
import threading
import asyncio
import websocket
import time
import json
from pynput import keyboard
import wave
import tempfile
from datetime import datetime
# Configuration
chunk = 1024 # Record in chunks of 1024 samples
sample_format = pyaudio.paInt16 # 16 bits per sample
channels = 1 # Stereo
fs = 48000 # Sample rate
p = pyaudio.PyAudio() # Create an interface to PortAudio
frames = [] # Initialize array to store frames
recording = False # Flag to control recording state
ws_chunk_size = 4096 # Websocket stream chunk size
port = os.getenv('ASSISTANT_PORT', 8000)
ws_url = f"ws://localhost:{port}/user"
while True:
ws = websocket.create_connection(ws_url)
except ConnectionRefusedError:
async def start_recording():
global recording
if recording:
return # Avoid multiple starts
recording = True
frames.clear() # Clear existing frames
stream = p.open(format=sample_format,
print("Recording started...")
async with websockets.connect("ws://localhost:8000/user") as websocket:
# Send the start command with mime type
await websocket.send(json.dumps({"role": "user", "type": "audio", "format": "audio/wav", "start": True}))
while recording:
data = stream.read(chunk)
file_path = save_recording(frames)
with open(file_path, 'rb') as audio_file:
byte_chunk = audio_file.read(ws_chunk_size)
while byte_chunk:
await websocket.send(json.dumps({"role": "user", "type": "audio", "format": "audio/wav", "content": str(byte_chunk)}))
byte_chunk = audio_file.read(ws_chunk_size)
# Send the end command
await websocket.send(json.dumps({"role": "user", "type": "audio", "format": "audio/wav", "end": True}))
# Receive a json message and then close the connection
message = await websocket.recv()
print("Received message:", json.loads(message))
print("Recording stopped.")
def save_recording(frames) -> str:
# Save the recorded data as a WAV file
temp_dir = tempfile.gettempdir()
# Create a temporary file with the appropriate extension
output_path = os.path.join(temp_dir, f"input_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav")
with wave.open(output_path, 'wb') as wf:
return output_path
def start_recording_sync():
# Create a new event loop for the thread
loop = asyncio.new_event_loop()
# Run the asyncio event loop
def stop_recording():
global recording
recording = False
print("Stopped recording")
def toggle_recording():
global recording
if recording:
# Start recording in a new thread to avoid blocking
print("Starting recording")
is_space_pressed = False # Flag to track the state of the spacebar
def on_press(key):
global is_space_pressed
if key == keyboard.Key.space and not is_space_pressed:
is_space_pressed = True
def on_release(key):
global is_space_pressed
if key == keyboard.Key.space and is_space_pressed:
is_space_pressed = False
if key == keyboard.Key.esc:
# Stop listener
return False
# Collect events until released
with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
with tempfile.TemporaryDirectory():
print("Press the spacebar to start/stop recording. Press ESC to exit.")
@ -1,32 +0,0 @@
Exposes a SSE streaming server endpoint at /run, which recieves language and code,
and streams the output.
from dotenv import load_dotenv
load_dotenv() # take environment variables from .env.
import os
import json
from interpreter import interpreter
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
class Code(BaseModel):
language: str
code: str
app = FastAPI()
async def run_code(code: Code):
def generator():
for chunk in interpreter.computer.run(code.language, code.code):
yield json.dumps(chunk)
return StreamingResponse(generator())
if __name__ == "__main__":
uvicorn.run(app, host="", port=int(os.getenv('COMPUTER_PORT', 9000)))
File diff suppressed because one or more lines are too long
Reference in new issue