Merge pull request #279 from benxu3/async-interpreter

01-rewrite
pull/285/head
killian 7 months ago committed by GitHub
commit bd5774d1b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -127,7 +127,9 @@ If you want to run local speech-to-text using Whisper, you must install Rust. Fo
## Customizations
To customize the behavior of the system, edit the [system message, model, skills library path,](https://docs.openinterpreter.com/settings/all-settings) etc. in `i.py`. This file sets up an interpreter, and is powered by Open Interpreter.
To customize the behavior of the system, edit the [system message, model, skills library path,](https://docs.openinterpreter.com/settings/all-settings) etc. in the `profiles` directory under the `server` directory. This file sets up an interpreter, and is powered by Open Interpreter.
To specify the text-to-speech service for the 01 `base_device.py`, set `interpreter.tts` to either "openai" for OpenAI, "elevenlabs" for ElevenLabs, or "coqui" for Coqui (local) in a profile. For the 01 Light, set `SPEAKER_SAMPLE_RATE` to 24000 for Coqui (local) or 22050 for OpenAI TTS. We currently don't support ElevenLabs TTS on the 01 Light.
## Ubuntu Dependencies

5844
software/poetry.lock generated

File diff suppressed because one or more lines are too long

@ -28,13 +28,27 @@ psutil = "^5.9.8"
typer = "^0.9.0"
platformdirs = "^4.2.0"
rich = "^13.7.1"
open-interpreter = {extras = ["os"], version = "^0.2.5"}
dateparser = "^1.2.0"
pytimeparse = "^1.1.8"
python-crontab = "^3.0.0"
inquirer = "^3.2.4"
pyqrcode = "^1.2.1"
realtimestt = "^0.1.12"
realtimetts = "^0.4.1"
keyboard = "^0.13.5"
pyautogui = "^0.9.54"
ctranslate2 = "4.1.0"
py3-tts = "^3.5"
elevenlabs = "1.2.2"
groq = "^0.5.0"
open-interpreter = {extras = ["os"], version = "^0.2.6"}
litellm = "1.35.35"
openai = "1.30.5"
pywebview = "*"
pyobjc = "*"
sentry-sdk = "^2.4.0"
plyer = "^2.1.0"
pywinctl = "^0.3"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

@ -1,5 +1,6 @@
; Config for Pytest Runner.
; suppress Deprecation Warning and User Warning to not spam the interface, but check periodically
[pytest]
python_files = tests.py test_*.py
filterwarnings =

@ -2,6 +2,7 @@ from dotenv import load_dotenv
load_dotenv() # take environment variables from .env.
import subprocess
import os
import sys
import asyncio
@ -46,7 +47,7 @@ accumulator = Accumulator()
CHUNK = 1024 # Record in chunks of 1024 samples
FORMAT = pyaudio.paInt16 # 16 bits per sample
CHANNELS = 1 # Mono
RATE = 44100 # Sample rate
RATE = 16000 # Sample rate
RECORDING = False # Flag to control recording state
SPACEBAR_PRESSED = False # Flag to track spacebar press state
@ -60,12 +61,18 @@ CAMERA_WARMUP_SECONDS = float(os.getenv("CAMERA_WARMUP_SECONDS", 0))
# Specify OS
current_platform = get_system_info()
def is_win11():
return sys.getwindowsversion().build >= 22000
def is_win10():
try:
return platform.system() == "Windows" and "10" in platform.version() and not is_win11()
return (
platform.system() == "Windows"
and "10" in platform.version()
and not is_win11()
)
except:
return False
@ -80,9 +87,10 @@ class Device:
def __init__(self):
self.pressed_keys = set()
self.captured_images = []
self.audiosegments = []
self.audiosegments = asyncio.Queue()
self.server_url = ""
self.ctrl_pressed = False
self.tts_service = ""
def fetch_image_from_camera(self, camera_index=CAMERA_DEVICE_INDEX):
"""Captures an image from the specified camera device and saves it to a temporary file. Adds the image to the captured_images list."""
@ -144,11 +152,25 @@ class Device:
async def play_audiosegments(self):
"""Plays them sequentially."""
mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"]
mpv_process = subprocess.Popen(
mpv_command,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
while True:
try:
for audio in self.audiosegments:
audio = await self.audiosegments.get()
if self.tts_service == "elevenlabs":
mpv_process.stdin.write(audio) # type: ignore
mpv_process.stdin.flush() # type: ignore
else:
play(audio)
self.audiosegments.remove(audio)
await asyncio.sleep(0.1)
except asyncio.exceptions.CancelledError:
# This happens once at the start?
@ -267,19 +289,18 @@ class Device:
def on_press(self, key):
"""Detect spacebar press and Ctrl+C combination."""
self.pressed_keys.add(key) # Add the pressed key to the set
if keyboard.Key.space in self.pressed_keys:
self.toggle_recording(True)
elif {keyboard.Key.ctrl, keyboard.KeyCode.from_char('c')} <= self.pressed_keys:
elif {keyboard.Key.ctrl, keyboard.KeyCode.from_char("c")} <= self.pressed_keys:
logger.info("Ctrl+C pressed. Exiting...")
kill_process_tree()
os._exit(0)
# Windows alternative to the above
if key == keyboard.Key.ctrl_l:
self.ctrl_pressed = True
try:
if key.vk == 67 and self.ctrl_pressed:
logger.info("Ctrl+C pressed. Exiting...")
@ -289,17 +310,17 @@ class Device:
except:
pass
def on_release(self, key):
"""Detect spacebar release and 'c' key press for camera, and handle key release."""
self.pressed_keys.discard(key) # Remove the released key from the key press tracking set
self.pressed_keys.discard(
key
) # Remove the released key from the key press tracking set
if key == keyboard.Key.ctrl_l:
self.ctrl_pressed = False
if key == keyboard.Key.space:
self.toggle_recording(False)
elif CAMERA_ENABLED and key == keyboard.KeyCode.from_char('c'):
elif CAMERA_ENABLED and key == keyboard.KeyCode.from_char("c"):
self.fetch_image_from_camera()
async def message_sender(self, websocket):
@ -332,35 +353,48 @@ class Device:
chunk = await websocket.recv()
logger.debug(f"Got this message from the server: {type(chunk)} {chunk}")
# print("received chunk from server")
if type(chunk) == str:
chunk = json.loads(chunk)
message = accumulator.accumulate(chunk)
if chunk.get("type") == "config":
self.tts_service = chunk.get("tts_service")
continue
if self.tts_service == "elevenlabs":
message = chunk
else:
message = accumulator.accumulate(chunk)
if message == None:
# Will be None until we have a full message ready
continue
# At this point, we have our message
if message["type"] == "audio" and message["format"].startswith("bytes"):
if isinstance(message, bytes) or (
message["type"] == "audio" and message["format"].startswith("bytes")
):
# Convert bytes to audio file
audio_bytes = message["content"]
# Create an AudioSegment instance with the raw data
audio = AudioSegment(
# raw audio data (bytes)
data=audio_bytes,
# signed 16-bit little-endian format
sample_width=2,
# 16,000 Hz frame rate
frame_rate=16000,
# mono sound
channels=1,
)
self.audiosegments.append(audio)
if self.tts_service == "elevenlabs":
audio_bytes = message
audio = audio_bytes
else:
audio_bytes = message["content"]
# Create an AudioSegment instance with the raw data
audio = AudioSegment(
# raw audio data (bytes)
data=audio_bytes,
# signed 16-bit little-endian format
sample_width=2,
# 16,000 Hz frame rate
frame_rate=22050,
# mono sound
channels=1,
)
await self.audiosegments.put(audio)
# Run the code if that's the client's job
if os.getenv("CODE_RUNNER") == "client":
@ -369,7 +403,7 @@ class Device:
code = message["content"]
result = interpreter.computer.run(language, code)
send_queue.put(result)
if is_win10():
logger.info("Windows 10 detected")
# Workaround for Windows 10 not latching to the websocket server.
@ -399,6 +433,7 @@ class Device:
# Start watching the kernel if it's your job to do that
if os.getenv("CODE_RUNNER") == "client":
# client is not running code!
asyncio.create_task(put_kernel_messages_into_queue(send_queue))
asyncio.create_task(self.play_audiosegments())

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -12,25 +12,25 @@
"dependencies": {
"@react-navigation/native": "^6.1.14",
"@react-navigation/native-stack": "^6.9.22",
"expo": "~51.0.8",
"expo-av": "~14.0.5",
"expo-barcode-scanner": "~13.0.1",
"expo-camera": "~15.0.9",
"expo-haptics": "~13.0.1",
"expo": "~50.0.8",
"expo-av": "~13.10.5",
"expo-barcode-scanner": "~12.9.3",
"expo-camera": "~14.0.5",
"expo-haptics": "~12.8.1",
"expo-permissions": "^14.4.0",
"expo-status-bar": "~1.12.1",
"expo-status-bar": "~1.11.1",
"react": "18.2.0",
"react-native": "0.74.1",
"react-native": "0.73.4",
"react-native-base64": "^0.2.1",
"react-native-polyfill-globals": "^3.1.0",
"react-native-safe-area-context": "4.10.1",
"react-native-screens": "~3.31.1",
"react-native-safe-area-context": "4.8.2",
"react-native-screens": "~3.29.0",
"text-encoding": "^0.7.0",
"zustand": "^4.5.2"
},
"devDependencies": {
"@babel/core": "^7.20.0",
"@types/react": "~18.2.79",
"@types/react": "~18.2.45",
"@types/react-native-base64": "^0.2.2",
"typescript": "^5.1.3"
},

@ -1,13 +1,12 @@
import React, { useState } from "react";
import { StyleSheet, Text, TouchableOpacity, View } from "react-native";
import { CameraView, useCameraPermissions } from "expo-camera";
import { Camera } from "expo-camera";
import { useNavigation } from "@react-navigation/native";
import { BarCodeScanner } from "expo-barcode-scanner";
// import useSoundEffect from "../lib/useSoundEffect";
export default function CameraScreen() {
const [permission, requestPermission] = useCameraPermissions();
const [permission, requestPermission] = Camera.useCameraPermissions();
// const playYay = useSoundEffect(require("../../assets/yay.wav"));
const [scanned, setScanned] = useState(false);
@ -52,12 +51,12 @@ export default function CameraScreen() {
};
return (
<View style={styles.container}>
<CameraView
<Camera
style={styles.camera}
facing={"back"}
onBarcodeScanned={scanned ? undefined : handleBarCodeScanned}
barcodeScannerSettings={{
barcodeTypes: ["qr"],
onBarCodeScanned={scanned ? undefined : handleBarCodeScanned}
barCodeScannerSettings={{
barCodeTypes: [BarCodeScanner.Constants.BarCodeType.qr],
}}
>
<View style={styles.buttonContainer}>
@ -75,7 +74,7 @@ export default function CameraScreen() {
</TouchableOpacity>
)}
</View>
</CameraView>
</Camera>
</View>
);
}

@ -5,6 +5,7 @@ import {
TouchableOpacity,
StyleSheet,
BackHandler,
ScrollView,
} from "react-native";
import * as FileSystem from "expo-file-system";
import { Audio } from "expo-av";
@ -52,7 +53,30 @@ const Main: React.FC<MainProps> = ({ route }) => {
inputRange: [0, 1],
outputRange: ["white", "black"],
});
const [accumulatedMessage, setAccumulatedMessage] = useState<string>("");
const scrollViewRef = useRef<ScrollView>(null);
/**
* Checks if audioDir exists in device storage, if not creates it.
*/
async function dirExists() {
try {
const dirInfo = await FileSystem.getInfoAsync(audioDir);
if (!dirInfo.exists) {
console.error("audio directory doesn't exist, creating...");
await FileSystem.makeDirectoryAsync(audioDir, { intermediates: true });
}
} catch (error) {
console.error("Error checking or creating directory:", error);
}
}
/**
* Writes the buffer to a temp file in audioDir in base64 encoding.
*
* @param {string} buffer
* @returns tempFilePath or null
*/
const constructTempFilePath = async (buffer: string) => {
try {
await dirExists();
@ -73,21 +97,10 @@ const Main: React.FC<MainProps> = ({ route }) => {
}
};
async function dirExists() {
/**
* Checks if audio directory exists in device storage, if not creates it.
*/
try {
const dirInfo = await FileSystem.getInfoAsync(audioDir);
if (!dirInfo.exists) {
console.error("audio directory doesn't exist, creating...");
await FileSystem.makeDirectoryAsync(audioDir, { intermediates: true });
}
} catch (error) {
console.error("Error checking or creating directory:", error);
}
}
/**
* Plays the next audio in audioQueue if the queue is not empty
* and there is no currently playing audio.
*/
const playNextAudio = useCallback(async () => {
if (audioQueueRef.current.length > 0 && soundRef.current == null) {
const uri = audioQueueRef.current.at(0) as string;
@ -107,6 +120,11 @@ const Main: React.FC<MainProps> = ({ route }) => {
}
},[]);
/**
* Queries the currently playing Expo Audio.Sound object soundRef
* for playback status. When the status denotes soundRef has finished
* playback, we unload the sound and call playNextAudio().
*/
const _onPlayBackStatusUpdate = useCallback(
async (status: any) => {
if (status.didJustFinish) {
@ -121,6 +139,9 @@ const Main: React.FC<MainProps> = ({ route }) => {
}
},[]);
/**
* Single swipe to return to the Home screen from the Main page.
*/
useEffect(() => {
const backAction = () => {
navigation.navigate("Home"); // Always navigate back to Home
@ -136,6 +157,9 @@ const Main: React.FC<MainProps> = ({ route }) => {
return () => backHandler.remove();
}, [navigation]);
/**
* Handles all WebSocket events
*/
useEffect(() => {
let websocket: WebSocket;
try {
@ -151,6 +175,10 @@ const Main: React.FC<MainProps> = ({ route }) => {
websocket.onmessage = async (e) => {
try {
const message = JSON.parse(e.data);
if (message.content && message.type == "message" && message.role == "assistant"){
setAccumulatedMessage((prevMessage) => prevMessage + message.content);
scrollViewRef.current?.scrollToEnd({ animated: true });
}
if (message.content && message.type == "audio") {
const buffer = message.content;
@ -198,7 +226,18 @@ const Main: React.FC<MainProps> = ({ route }) => {
return (
<Animated.View style={[styles.container, { backgroundColor }]}>
<View style={styles.middle}>
<View style={{flex: 6, alignItems: "center", justifyContent: "center",}}>
<ScrollView
ref={scrollViewRef}
style={styles.scrollViewContent}
showsVerticalScrollIndicator={false}
>
<Text style={styles.accumulatedMessage}>
{accumulatedMessage}
</Text>
</ScrollView>
</View>
<View style={{flex: 2, justifyContent: "center", alignItems: "center",}}>
<RecordButton
playPip={playPip}
playPop={playPop}
@ -211,6 +250,8 @@ const Main: React.FC<MainProps> = ({ route }) => {
buttonBackgroundColor={buttonBackgroundColor}
setIsPressed={setIsPressed}
/>
</View>
<View style={{flex: 1}}>
<TouchableOpacity
style={styles.statusButton}
onPress={() => {
@ -238,39 +279,7 @@ const Main: React.FC<MainProps> = ({ route }) => {
const styles = StyleSheet.create({
container: {
flex: 1,
position: "relative",
},
middle: {
flex: 1,
justifyContent: "center",
alignItems: "center",
padding: 10,
position: "relative",
},
circle: {
width: 100,
height: 100,
borderRadius: 50,
justifyContent: "center",
alignItems: "center",
},
qr: {
position: "absolute",
top: 30,
left: 10,
padding: 10,
zIndex: 100,
},
icon: {
height: 40,
width: 40,
},
topBar: {
height: 40,
backgroundColor: "#000",
paddingTop: 50,
},
statusText: {
fontSize: 12,
fontWeight: "bold",
@ -280,6 +289,22 @@ const styles = StyleSheet.create({
bottom: 20,
alignSelf: "center",
},
accumulatedMessage: {
margin: 20,
fontSize: 15,
textAlign: "left",
color: "white",
paddingBottom: 30,
fontFamily: "monospace",
},
scrollViewContent: {
padding: 25,
width: "90%",
maxHeight: "80%",
borderWidth: 5,
borderColor: "white",
borderRadius: 10,
},
});
export default Main;

@ -0,0 +1,220 @@
# This is a websocket interpreter, TTS and STT disabled.
# It makes a websocket on port 8000 that sends/recieves LMC messages in *streaming* format.
### You MUST send a start and end flag with each message! For example: ###
"""
{"role": "user", "type": "message", "start": True})
{"role": "user", "type": "message", "content": "hi"})
{"role": "user", "type": "message", "end": True})
"""
###
from pynput import keyboard
from .utils.bytes_to_wav import bytes_to_wav
from RealtimeTTS import TextToAudioStream, CoquiEngine, OpenAIEngine, ElevenlabsEngine
from RealtimeSTT import AudioToTextRecorder
import time
import asyncio
import json
import os
class AsyncInterpreter:
def __init__(self, interpreter):
self.interpreter = interpreter
self.audio_chunks = []
# STT
self.stt = AudioToTextRecorder(
model="tiny.en", spinner=False, use_microphone=False
)
self.stt.stop() # It needs this for some reason
# TTS
if self.interpreter.tts == "coqui":
engine = CoquiEngine()
elif self.interpreter.tts == "openai":
engine = OpenAIEngine()
elif self.interpreter.tts == "elevenlabs":
engine = ElevenlabsEngine(api_key=os.environ["ELEVEN_LABS_API_KEY"])
engine.set_voice("Michael")
else:
raise ValueError(f"Unsupported TTS engine: {self.interpreter.tts}")
self.tts = TextToAudioStream(engine)
self.active_chat_messages = []
self._input_queue = asyncio.Queue() # Queue that .input will shove things into
self._output_queue = asyncio.Queue() # Queue to put output chunks into
self._last_lmc_start_flag = None # Unix time of last LMC start flag recieved
self._in_keyboard_write_block = (
False # Tracks whether interpreter is trying to use the keyboard
)
self.loop = asyncio.get_event_loop()
async def _add_to_queue(self, queue, item):
await queue.put(item)
async def clear_queue(self, queue):
while not queue.empty():
await queue.get()
async def clear_input_queue(self):
await self.clear_queue(self._input_queue)
async def clear_output_queue(self):
await self.clear_queue(self._output_queue)
async def input(self, chunk):
"""
Expects a chunk in streaming LMC format.
"""
if isinstance(chunk, bytes):
# It's probably a chunk of audio
self.stt.feed_audio(chunk)
self.audio_chunks.append(chunk)
# print("INTERPRETER FEEDING AUDIO")
else:
try:
chunk = json.loads(chunk)
except:
pass
if "start" in chunk:
# print("Starting STT")
self.stt.start()
self._last_lmc_start_flag = time.time()
# self.interpreter.computer.terminal.stop() # Stop any code execution... maybe we should make interpreter.stop()?
elif "end" in chunk:
# print("Running OI on input")
asyncio.create_task(self.run())
else:
await self._add_to_queue(self._input_queue, chunk)
def add_to_output_queue_sync(self, chunk):
"""
Synchronous function to add a chunk to the output queue.
"""
# print("ADDING TO QUEUE:", chunk)
asyncio.create_task(self._add_to_queue(self._output_queue, chunk))
def generate(self, message, start_interpreter):
last_lmc_start_flag = self._last_lmc_start_flag
self.interpreter.messages = self.active_chat_messages
# print("message is", message)
for chunk in self.interpreter.chat(message, display=True, stream=True):
if self._last_lmc_start_flag != last_lmc_start_flag:
# self.beeper.stop()
break
# self.add_to_output_queue_sync(chunk) # To send text, not just audio
content = chunk.get("content")
# Handle message blocks
if chunk.get("type") == "message":
if content:
# self.beeper.stop()
# Experimental: The AI voice sounds better with replacements like these, but it should happen at the TTS layer
# content = content.replace(". ", ". ... ").replace(", ", ", ... ").replace("!", "! ... ").replace("?", "? ... ")
# print("yielding ", content)
yield content
# Handle code blocks
elif chunk.get("type") == "code":
if "start" in chunk:
# self.beeper.start()
pass
# Experimental: If the AI wants to type, we should type immediatly
if (
self.interpreter.messages[-1]
.get("content", "")
.startswith("computer.keyboard.write(")
):
keyboard.controller.type(content)
self._in_keyboard_write_block = True
if "end" in chunk and self._in_keyboard_write_block:
self._in_keyboard_write_block = False
# (This will make it so it doesn't type twice when the block executes)
if self.interpreter.messages[-1]["content"].startswith(
"computer.keyboard.write("
):
self.interpreter.messages[-1]["content"] = (
"dummy_variable = ("
+ self.interpreter.messages[-1]["content"][
len("computer.keyboard.write(") :
]
)
# Send a completion signal
# self.add_to_output_queue_sync({"role": "server","type": "completion", "content": "DONE"})
async def run(self):
"""
Runs OI on the audio bytes submitted to the input. Will add streaming LMC chunks to the _output_queue.
"""
self.interpreter.messages = self.active_chat_messages
self.stt.stop()
input_queue = []
while not self._input_queue.empty():
input_queue.append(self._input_queue.get())
message = self.stt.text()
if self.audio_chunks:
audio_bytes = bytearray(b"".join(self.audio_chunks))
wav_file_path = bytes_to_wav(audio_bytes, "audio/raw")
print("wav_file_path ", wav_file_path)
self.audio_chunks = []
print(message)
# Feed generate to RealtimeTTS
self.add_to_output_queue_sync(
{"role": "assistant", "type": "audio", "format": "bytes.wav", "start": True}
)
start_interpreter = time.time()
text_iterator = self.generate(message, start_interpreter)
self.tts.feed(text_iterator)
if not self.tts.is_playing():
self.tts.play_async(on_audio_chunk=self.on_tts_chunk, muted=True)
while True:
await asyncio.sleep(0.1)
# print("is_playing", self.tts.is_playing())
if not self.tts.is_playing():
self.add_to_output_queue_sync(
{
"role": "assistant",
"type": "audio",
"format": "bytes.wav",
"end": True,
}
)
self.tts.stop()
break
async def _on_tts_chunk_async(self, chunk):
# print("adding chunk to queue")
await self._add_to_queue(self._output_queue, chunk)
def on_tts_chunk(self, chunk):
# print("ye")
asyncio.run(self._on_tts_chunk_async(chunk))
async def output(self):
# print("outputting chunks")
return await self._output_queue.get()

@ -0,0 +1,102 @@
# import from the profiles directory the interpreter to be served
# add other profiles to the directory to define other interpreter instances and import them here
# {.profiles.fast: optimizes for STT/TTS latency with the fastest models }
# {.profiles.local: uses local models and local STT/TTS }
# {.profiles.default: uses default interpreter settings with optimized TTS latency }
# from .profiles.fast import interpreter as base_interpreter
# from .profiles.local import interpreter as base_interpreter
from .profiles.default import interpreter as base_interpreter
import asyncio
import traceback
import json
from fastapi import FastAPI, WebSocket
from fastapi.responses import PlainTextResponse
from uvicorn import Config, Server
from .async_interpreter import AsyncInterpreter
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Dict, Any
import os
os.environ["STT_RUNNER"] = "server"
os.environ["TTS_RUNNER"] = "server"
# interpreter.tts set in the profiles directory!!!!
interpreter = AsyncInterpreter(base_interpreter)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"], # Allow all methods (GET, POST, etc.)
allow_headers=["*"], # Allow all headers
)
@app.get("/ping")
async def ping():
return PlainTextResponse("pong")
@app.websocket("/")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Send the tts_service value to the client
await websocket.send_text(
json.dumps({"type": "config", "tts_service": interpreter.interpreter.tts})
)
try:
async def receive_input():
while True:
if websocket.client_state == "DISCONNECTED":
break
data = await websocket.receive()
if isinstance(data, bytes):
await interpreter.input(data)
elif "bytes" in data:
await interpreter.input(data["bytes"])
# print("RECEIVED INPUT", data)
elif "text" in data:
# print("RECEIVED INPUT", data)
await interpreter.input(data["text"])
async def send_output():
while True:
output = await interpreter.output()
if isinstance(output, bytes):
# print(f"Sending {len(output)} bytes of audio data.")
await websocket.send_bytes(output)
# we dont send out bytes rn, no TTS
elif isinstance(output, dict):
# print("sending text")
await websocket.send_text(json.dumps(output))
await asyncio.gather(send_output(), receive_input())
except Exception as e:
print(f"WebSocket connection closed with exception: {e}")
traceback.print_exc()
finally:
if not websocket.client_state == "DISCONNECTED":
await websocket.close()
async def main(server_host, server_port):
print(f"Starting server on {server_host}:{server_port}")
config = Config(app, host=server_host, port=server_port, lifespan="on")
server = Server(config)
await server.serve()
if __name__ == "__main__":
asyncio.run(main())

@ -1,8 +1,13 @@
# tests currently hang after completion
"""
import pytest
from source.server.i import configure_interpreter
from interpreter import OpenInterpreter
import signal
import os
from .profiles.default import interpreter
from async_interpreter import AsyncInterpreter
from fastapi.testclient import TestClient
from .server import app
from .async_server import app
@pytest.fixture
@ -12,5 +17,20 @@ def client():
@pytest.fixture
def mock_interpreter():
interpreter = configure_interpreter(OpenInterpreter())
return interpreter
async_interpreter = AsyncInterpreter(interpreter)
yield async_interpreter
async_interpreter.shutdown()
@pytest.fixture(scope="function", autouse=True)
def term_handler():
orig = signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT))
yield
signal.signal(signal.SIGTERM, orig)
yield
# Send SIGTERM signal to the current process and its children
os.kill(os.getpid(), signal.SIGTERM)
"""

@ -1,366 +0,0 @@
from dotenv import load_dotenv
import os
load_dotenv() # take environment variables from .env.
import glob
import time
import json
from interpreter import OpenInterpreter
import shutil
system_message = r"""
You are the 01, a screenless executive assistant that can complete any task.
When you execute code, it will be executed on the user's machine. The user has given you full and complete permission to execute any code necessary to complete the task.
Run any code to achieve the goal, and if at first you don't succeed, try again and again.
You can install new packages.
Be concise. Your messages are being read aloud to the user. DO NOT MAKE PLANS. RUN CODE QUICKLY.
Try to spread complex tasks over multiple code blocks. Don't try to complex tasks in one go.
Manually summarize text.
DON'T TELL THE USER THE METHOD YOU'LL USE, OR MAKE PLANS. ACT LIKE THIS:
---
user: Are there any concerts in Seattle?
assistant: Let me check on that.
```python
computer.browser.search("concerts in Seattle")
```
```output
Upcoming concerts: Bad Bunny at Neumos...
```
It looks like there's a Bad Bunny concert at Neumos...
---
Act like you can just answer any question, then run code (this is hidden from the user) to answer it.
THE USER CANNOT SEE CODE BLOCKS.
Your responses should be very short, no more than 1-2 sentences long.
DO NOT USE MARKDOWN. ONLY WRITE PLAIN TEXT.
# TASKS
Help the user manage their tasks.
Store the user's tasks in a Python list called `tasks`.
The user's current task list (it might be empty) is: {{ tasks }}
When the user completes the current task, you should remove it from the list and read the next item by running `tasks = tasks[1:]\ntasks[0]`. Then, tell the user what the next task is.
When the user tells you about a set of tasks, you should intelligently order tasks, batch similar tasks, and break down large tasks into smaller tasks (for this, you should consult the user and get their permission to break it down). Your goal is to manage the task list as intelligently as possible, to make the user as efficient and non-overwhelmed as possible. They will require a lot of encouragement, support, and kindness. Don't say too much about what's ahead of them just try to focus them on each step at a time.
After starting a task, you should check in with the user around the estimated completion time to see if the task is completed.
To do this, schedule a reminder based on estimated completion time using the function `schedule(message="Your message here.", start="8am")`, WHICH HAS ALREADY BEEN IMPORTED. YOU DON'T NEED TO IMPORT THE `schedule` FUNCTION. IT IS AVAILABLE. You'll receive the message at the time you scheduled it. If the user says to monitor something, simply schedule it with an interval of a duration that makes sense for the problem by specifying an interval, like this: `schedule(message="Your message here.", interval="5m")`
If there are tasks, you should guide the user through their list one task at a time, convincing them to move forward, giving a pep talk if need be.
# THE COMPUTER API
The `computer` module is ALREADY IMPORTED, and can be used for some tasks:
```python
result_string = computer.browser.search(query) # Google search results will be returned from this function as a string
computer.calendar.create_event(title="Meeting", start_date=datetime.datetime.now(), end_date=datetime.datetime.now() + datetime.timedelta(hours=1), notes="Note", location="") # Creates a calendar event
events_string = computer.calendar.get_events(start_date=datetime.date.today(), end_date=None) # Get events between dates. If end_date is None, only gets events for start_date
computer.calendar.delete_event(event_title="Meeting", start_date=datetime.datetime) # Delete a specific event with a matching title and start date, you may need to get use get_events() to find the specific event object first
phone_string = computer.contacts.get_phone_number("John Doe")
contact_string = computer.contacts.get_email_address("John Doe")
computer.mail.send("john@email.com", "Meeting Reminder", "Reminder that our meeting is at 3pm today.", ["path/to/attachment.pdf", "path/to/attachment2.pdf"]) # Send an email with a optional attachments
emails_string = computer.mail.get(4, unread=True) # Returns the {number} of unread emails, or all emails if False is passed
unread_num = computer.mail.unread_count() # Returns the number of unread emails
computer.sms.send("555-123-4567", "Hello from the computer!") # Send a text message. MUST be a phone number, so use computer.contacts.get_phone_number frequently here
```
Do not import the computer module, or any of its sub-modules. They are already imported.
DO NOT use the computer module for ALL tasks. Many tasks can be accomplished via Python, or by pip installing new libraries. Be creative!
# GUI CONTROL (RARE)
You are a computer controlling language model. You can control the user's GUI.
You may use the `computer` module to control the user's keyboard and mouse, if the task **requires** it:
```python
computer.display.view() # Shows you what's on the screen, returns a `pil_image` `in case you need it (rarely). **You almost always want to do this first!**
computer.keyboard.hotkey(" ", "command") # Opens spotlight
computer.keyboard.write("hello")
computer.mouse.click("text onscreen") # This clicks on the UI element with that text. Use this **frequently** and get creative! To click a video, you could pass the *timestamp* (which is usually written on the thumbnail) into this.
computer.mouse.move("open recent >") # This moves the mouse over the UI element with that text. Many dropdowns will disappear if you click them. You have to hover over items to reveal more.
computer.mouse.click(x=500, y=500) # Use this very, very rarely. It's highly inaccurate
computer.mouse.click(icon="gear icon") # Moves mouse to the icon with that description. Use this very often
computer.mouse.scroll(-10) # Scrolls down. If you don't find some text on screen that you expected to be there, you probably want to do this
```
You are an image-based AI, you can see images.
Clicking text is the most reliable way to use the mouse for example, clicking a URL's text you see in the URL bar, or some textarea's placeholder text (like "Search" to get into a search bar).
If you use `plt.show()`, the resulting image will be sent to you. However, if you use `PIL.Image.show()`, the resulting image will NOT be sent to you.
It is very important to make sure you are focused on the right application and window. Often, your first command should always be to explicitly switch to the correct application. On Macs, ALWAYS use Spotlight to switch applications, remember to click enter.
When searching the web, use query parameters. For example, https://www.amazon.com/s?k=monitor
# SKILLS
Try to use the following special functions (or "skills") to complete your goals whenever possible.
THESE ARE ALREADY IMPORTED. YOU CAN CALL THEM INSTANTLY.
---
{{
import sys
import os
import json
import ast
from platformdirs import user_data_dir
directory = os.path.join(user_data_dir('01'), 'skills')
if not os.path.exists(directory):
os.mkdir(directory)
def get_function_info(file_path):
with open(file_path, "r") as file:
tree = ast.parse(file.read())
functions = [node for node in tree.body if isinstance(node, ast.FunctionDef)]
for function in functions:
docstring = ast.get_docstring(function)
args = [arg.arg for arg in function.args.args]
print(f"Function Name: {function.name}")
print(f"Arguments: {args}")
print(f"Docstring: {docstring}")
print("---")
files = os.listdir(directory)
for file in files:
if file.endswith(".py"):
file_path = os.path.join(directory, file)
get_function_info(file_path)
}}
YOU can add to the above list of skills by defining a python function. The function will be saved as a skill.
Search all existing skills by running `computer.skills.search(query)`.
**Teach Mode**
If the USER says they want to teach you something, exactly write the following, including the markdown code block:
---
One moment.
```python
computer.skills.new_skill.create()
```
---
If you decide to make a skill yourself to help the user, simply define a python function. `computer.skills.new_skill.create()` is for user-described skills.
# USE COMMENTS TO PLAN
IF YOU NEED TO THINK ABOUT A PROBLEM: (such as "Here's the plan:"), WRITE IT IN THE COMMENTS of the code block!
---
User: What is 432/7?
Assistant: Let me think about that.
```python
# Here's the plan:
# 1. Divide the numbers
# 2. Round to 3 digits
print(round(432/7, 3))
```
```output
61.714
```
The answer is 61.714.
---
# MANUAL TASKS
Translate things to other languages INSTANTLY and MANUALLY. Don't ever try to use a translation tool.
Summarize things manually. DO NOT use a summarizer tool.
# CRITICAL NOTES
Code output, despite being sent to you by the user, cannot be seen by the user. You NEED to tell the user about the output of some code, even if it's exact. >>The user does not have a screen.<<
ALWAYS REMEMBER: You are running on a device called the O1, where the interface is entirely speech-based. Make your responses to the user VERY short. DO NOT PLAN. BE CONCISE. WRITE CODE TO RUN IT.
Try multiple methods before saying the task is impossible. **You can do it!**
""".strip()
def configure_interpreter(interpreter: OpenInterpreter):
### SYSTEM MESSAGE
interpreter.system_message = system_message
interpreter.llm.supports_vision = True
interpreter.shrink_images = True # Faster but less accurate
interpreter.llm.model = "gpt-4"
interpreter.llm.supports_functions = False
interpreter.llm.context_window = 110000
interpreter.llm.max_tokens = 4096
interpreter.auto_run = True
interpreter.force_task_completion = True
interpreter.force_task_completion_message = """Proceed with what you were doing (this is not confirmation, if you just asked me something). You CAN run code on my machine. If you want to run code, start your message with "```"! If the entire task is done, say exactly 'The task is done.' If you need some specific information (like username, message text, skill name, skill step, etc.) say EXACTLY 'Please provide more information.' If it's impossible, say 'The task is impossible.' (If I haven't provided a task, say exactly 'Let me know what you'd like to do next.') Otherwise keep going. CRITICAL: REMEMBER TO FOLLOW ALL PREVIOUS INSTRUCTIONS. If I'm teaching you something, remember to run the related `computer.skills.new_skill` function."""
interpreter.force_task_completion_breakers = [
"The task is done.",
"The task is impossible.",
"Let me know what you'd like to do next.",
"Please provide more information.",
]
# Check if required packages are installed
# THERE IS AN INCONSISTENCY HERE.
# We should be testing if they import WITHIN OI's computer, not here.
packages = ["cv2", "plyer", "pyautogui", "pyperclip", "pywinctl"]
missing_packages = []
for package in packages:
try:
__import__(package)
except ImportError:
missing_packages.append(package)
if missing_packages:
interpreter.display_message(
f"> **Missing Package(s): {', '.join(['`' + p + '`' for p in missing_packages])}**\n\nThese packages are required for OS Control.\n\nInstall them?\n"
)
user_input = input("(y/n) > ")
if user_input.lower() != "y":
print("\nPlease try to install them manually.\n\n")
time.sleep(2)
print("Attempting to start OS control anyway...\n\n")
for pip_name in ["pip", "pip3"]:
command = f"{pip_name} install 'open-interpreter[os]'"
interpreter.computer.run("shell", command, display=True)
got_em = True
for package in missing_packages:
try:
__import__(package)
except ImportError:
got_em = False
if got_em:
break
missing_packages = []
for package in packages:
try:
__import__(package)
except ImportError:
missing_packages.append(package)
if missing_packages != []:
print(
"\n\nWarning: The following packages could not be installed:",
", ".join(missing_packages),
)
print("\nPlease try to install them manually.\n\n")
time.sleep(2)
print("Attempting to start OS control anyway...\n\n")
# Should we explore other options for ^ these kinds of tags?
# Like:
# from rich import box
# from rich.console import Console
# from rich.panel import Panel
# console = Console()
# print(">\n\n")
# console.print(Panel("[bold italic white on black]OS CONTROL[/bold italic white on black] Enabled", box=box.SQUARE, expand=False), style="white on black")
# print(">\n\n")
# console.print(Panel("[bold italic white on black]OS CONTROL[/bold italic white on black] Enabled", box=box.HEAVY, expand=False), style="white on black")
# print(">\n\n")
# console.print(Panel("[bold italic white on black]OS CONTROL[/bold italic white on black] Enabled", box=box.DOUBLE, expand=False), style="white on black")
# print(">\n\n")
# console.print(Panel("[bold italic white on black]OS CONTROL[/bold italic white on black] Enabled", box=box.SQUARE, expand=False), style="white on black")
if not interpreter.offline and not interpreter.auto_run:
api_message = "To find items on the screen, Open Interpreter has been instructed to send screenshots to [api.openinterpreter.com](https://api.openinterpreter.com/) (we do not store them). Add `--offline` to attempt this locally."
interpreter.display_message(api_message)
print("")
if not interpreter.auto_run:
screen_recording_message = "**Make sure that screen recording permissions are enabled for your Terminal or Python environment.**"
interpreter.display_message(screen_recording_message)
print("")
# # FOR TESTING ONLY
# # Install Open Interpreter from GitHub
# for chunk in interpreter.computer.run(
# "shell",
# "pip install git+https://github.com/KillianLucas/open-interpreter.git",
# ):
# if chunk.get("format") != "active_line":
# print(chunk.get("content"))
from platformdirs import user_data_dir
# Directory paths
repo_skills_dir = os.path.join(os.path.dirname(__file__), "skills")
user_data_skills_dir = os.path.join(user_data_dir("01"), "skills")
# Create the user data skills directory if it doesn't exist
os.makedirs(user_data_skills_dir, exist_ok=True)
# Copy Python files from the repository skills directory to the user data skills directory, ignoring __init__.py files
for filename in os.listdir(repo_skills_dir):
if filename.endswith(".py") and filename != "__init__.py":
src_file = os.path.join(repo_skills_dir, filename)
dst_file = os.path.join(user_data_skills_dir, filename)
shutil.copy2(src_file, dst_file)
interpreter.computer.debug = True
interpreter.computer.skills.path = user_data_skills_dir
# Import skills
interpreter.computer.save_skills = False
for file in glob.glob(os.path.join(interpreter.computer.skills.path, "*.py")):
code_to_run = ""
with open(file, "r") as f:
code_to_run += f.read() + "\n"
interpreter.computer.run("python", code_to_run)
interpreter.computer.save_skills = True
# Initialize user's task list
interpreter.computer.run(
language="python",
code="tasks = []",
display=interpreter.verbose,
)
# Give it access to the computer via Python
interpreter.computer.run(
language="python",
code="import time\nfrom interpreter import interpreter\ncomputer = interpreter.computer", # We ask it to use time, so
display=interpreter.verbose,
)
if not interpreter.auto_run:
interpreter.display_message(
"**Warning:** In this mode, Open Interpreter will not require approval before performing actions. Be ready to close your terminal."
)
print("") # < - Aesthetic choice
### MISC SETTINGS
interpreter.auto_run = True
interpreter.computer.languages = [
l
for l in interpreter.computer.languages
if l.name.lower() in ["applescript", "shell", "zsh", "bash", "python"]
]
interpreter.force_task_completion = True
# interpreter.offline = True
interpreter.id = 206 # Used to identify itself to other interpreters. This should be changed programmatically so it's unique.
### RESET conversations/user.json
app_dir = user_data_dir("01")
conversations_dir = os.path.join(app_dir, "conversations")
os.makedirs(conversations_dir, exist_ok=True)
user_json_path = os.path.join(conversations_dir, "user.json")
with open(user_json_path, "w") as file:
json.dump([], file)
return interpreter

@ -1,29 +0,0 @@
from dotenv import load_dotenv
load_dotenv() # take environment variables from .env.
import os
import subprocess
from pathlib import Path
### LLM SETUP
# Define the path to a llamafile
llamafile_path = Path(__file__).parent / "model.llamafile"
# Check if the new llamafile exists, if not download it
if not os.path.exists(llamafile_path):
subprocess.run(
[
"wget",
"-O",
llamafile_path,
"https://huggingface.co/jartine/phi-2-llamafile/resolve/main/phi-2.Q4_K_M.llamafile",
],
check=True,
)
# Make the new llamafile executable
subprocess.run(["chmod", "+x", llamafile_path], check=True)
# Run the new llamafile
subprocess.run([str(llamafile_path)], check=True)

@ -0,0 +1,186 @@
from interpreter import interpreter
# This is an Open Interpreter compatible profile.
# Visit https://01.openinterpreter.com/profile for all options.
# 01 supports OpenAI, ElevenLabs, and Coqui (Local) TTS providers
# {OpenAI: "openai", ElevenLabs: "elevenlabs", Coqui: "coqui"}
interpreter.tts = "openai"
# Connect your 01 to a language model
interpreter.llm.model = "gpt-4-turbo"
interpreter.llm.context_window = 100000
interpreter.llm.max_tokens = 4096
# interpreter.llm.api_key = "<your_openai_api_key_here>"
# Tell your 01 where to find and save skills
interpreter.computer.skills.path = "./skills"
# Extra settings
interpreter.computer.import_computer_api = True
interpreter.computer.import_skills = True
interpreter.computer.run("python", "computer") # This will trigger those imports
interpreter.auto_run = True
interpreter.loop = True
interpreter.loop_message = """Proceed with what you were doing (this is not confirmation, if you just asked me something). You CAN run code on my machine. If you want to run code, start your message with "```"! If the entire task is done, say exactly 'The task is done.' If you need some specific information (like username, message text, skill name, skill step, etc.) say EXACTLY 'Please provide more information.' If it's impossible, say 'The task is impossible.' (If I haven't provided a task, say exactly 'Let me know what you'd like to do next.') Otherwise keep going. CRITICAL: REMEMBER TO FOLLOW ALL PREVIOUS INSTRUCTIONS. If I'm teaching you something, remember to run the related `computer.skills.new_skill` function."""
interpreter.loop_breakers = [
"The task is done.",
"The task is impossible.",
"Let me know what you'd like to do next.",
"Please provide more information.",
]
# Set the identity and personality of your 01
interpreter.system_message = """
You are the 01, a screenless executive assistant that can complete any task.
When you execute code, it will be executed on the user's machine. The user has given you full and complete permission to execute any code necessary to complete the task.
Run any code to achieve the goal, and if at first you don't succeed, try again and again.
You can install new packages.
Be concise. Your messages are being read aloud to the user. DO NOT MAKE PLANS. RUN CODE QUICKLY.
Try to spread complex tasks over multiple code blocks. Don't try to complex tasks in one go.
Manually summarize text.
DON'T TELL THE USER THE METHOD YOU'LL USE, OR MAKE PLANS. ACT LIKE THIS:
---
user: Are there any concerts in Seattle?
assistant: Let me check on that.
```python
computer.browser.search("concerts in Seattle")
```
```output
Upcoming concerts: Bad Bunny at Neumos...
```
It looks like there's a Bad Bunny concert at Neumos...
---
Act like you can just answer any question, then run code (this is hidden from the user) to answer it.
THE USER CANNOT SEE CODE BLOCKS.
Your responses should be very short, no more than 1-2 sentences long.
DO NOT USE MARKDOWN. ONLY WRITE PLAIN TEXT.
# THE COMPUTER API
The `computer` module is ALREADY IMPORTED, and can be used for some tasks:
```python
result_string = computer.browser.search(query) # Google search results will be returned from this function as a string
computer.files.edit(path_to_file, original_text, replacement_text) # Edit a file
computer.calendar.create_event(title="Meeting", start_date=datetime.datetime.now(), end_date=datetime.datetime.now() + datetime.timedelta(hours=1), notes="Note", location="") # Creates a calendar event
events_string = computer.calendar.get_events(start_date=datetime.date.today(), end_date=None) # Get events between dates. If end_date is None, only gets events for start_date
computer.calendar.delete_event(event_title="Meeting", start_date=datetime.datetime) # Delete a specific event with a matching title and start date, you may need to get use get_events() to find the specific event object first
phone_string = computer.contacts.get_phone_number("John Doe")
contact_string = computer.contacts.get_email_address("John Doe")
computer.mail.send("john@email.com", "Meeting Reminder", "Reminder that our meeting is at 3pm today.", ["path/to/attachment.pdf", "path/to/attachment2.pdf"]) # Send an email with a optional attachments
emails_string = computer.mail.get(4, unread=True) # Returns the {number} of unread emails, or all emails if False is passed
unread_num = computer.mail.unread_count() # Returns the number of unread emails
computer.sms.send("555-123-4567", "Hello from the computer!") # Send a text message. MUST be a phone number, so use computer.contacts.get_phone_number frequently here
```
Do not import the computer module, or any of its sub-modules. They are already imported.
DO NOT use the computer module for ALL tasks. Many tasks can be accomplished via Python, or by pip installing new libraries. Be creative!
# GUI CONTROL (RARE)
You are a computer controlling language model. You can control the user's GUI.
You may use the `computer` module to control the user's keyboard and mouse, if the task **requires** it:
```python
computer.display.view() # Shows you what's on the screen. **You almost always want to do this first!**
computer.keyboard.hotkey(" ", "command") # Opens spotlight
computer.keyboard.write("hello")
computer.mouse.click("text onscreen") # This clicks on the UI element with that text. Use this **frequently** and get creative! To click a video, you could pass the *timestamp* (which is usually written on the thumbnail) into this.
computer.mouse.move("open recent >") # This moves the mouse over the UI element with that text. Many dropdowns will disappear if you click them. You have to hover over items to reveal more.
computer.mouse.click(x=500, y=500) # Use this very, very rarely. It's highly inaccurate
computer.mouse.click(icon="gear icon") # Moves mouse to the icon with that description. Use this very often
computer.mouse.scroll(-10) # Scrolls down. If you don't find some text on screen that you expected to be there, you probably want to do this
```
You are an image-based AI, you can see images.
Clicking text is the most reliable way to use the mouse for example, clicking a URL's text you see in the URL bar, or some textarea's placeholder text (like "Search" to get into a search bar).
If you use `plt.show()`, the resulting image will be sent to you. However, if you use `PIL.Image.show()`, the resulting image will NOT be sent to you.
It is very important to make sure you are focused on the right application and window. Often, your first command should always be to explicitly switch to the correct application. On Macs, ALWAYS use Spotlight to switch applications.
When searching the web, use query parameters. For example, https://www.amazon.com/s?k=monitor
# SKILLS
Try to use the following special functions (or "skills") to complete your goals whenever possible.
THESE ARE ALREADY IMPORTED. YOU CAN CALL THEM INSTANTLY.
---
{{
import sys
import os
import json
import ast
directory = "./skills"
def get_function_info(file_path):
with open(file_path, "r") as file:
tree = ast.parse(file.read())
functions = [node for node in tree.body if isinstance(node, ast.FunctionDef)]
for function in functions:
docstring = ast.get_docstring(function)
args = [arg.arg for arg in function.args.args]
print(f"Function Name: {function.name}")
print(f"Arguments: {args}")
print(f"Docstring: {docstring}")
print("---")
files = os.listdir(directory)
for file in files:
if file.endswith(".py"):
file_path = os.path.join(directory, file)
get_function_info(file_path)
}}
YOU can add to the above list of skills by defining a python function. The function will be saved as a skill.
Search all existing skills by running `computer.skills.search(query)`.
**Teach Mode**
If the USER says they want to teach you something, exactly write the following, including the markdown code block:
---
One moment.
```python
computer.skills.new_skill.create()
```
---
If you decide to make a skill yourself to help the user, simply define a python function. `computer.skills.new_skill.create()` is for user-described skills.
# USE COMMENTS TO PLAN
IF YOU NEED TO THINK ABOUT A PROBLEM: (such as "Here's the plan:"), WRITE IT IN THE COMMENTS of the code block!
---
User: What is 432/7?
Assistant: Let me think about that.
```python
# Here's the plan:
# 1. Divide the numbers
# 2. Round to 3 digits
print(round(432/7, 3))
```
```output
61.714
```
The answer is 61.714.
---
# MANUAL TASKS
Translate things to other languages INSTANTLY and MANUALLY. Don't ever try to use a translation tool.
Summarize things manually. DO NOT use a summarizer tool.
# CRITICAL NOTES
Code output, despite being sent to you by the user, cannot be seen by the user. You NEED to tell the user about the output of some code, even if it's exact. >>The user does not have a screen.<<
ALWAYS REMEMBER: You are running on a device called the O1, where the interface is entirely speech-based. Make your responses to the user VERY short. DO NOT PLAN. BE CONCISE. WRITE CODE TO RUN IT.
Try multiple methods before saying the task is impossible. **You can do it!**
""".strip()

@ -0,0 +1,24 @@
from interpreter import interpreter
# This is an Open Interpreter compatible profile.
# Visit https://01.openinterpreter.com/profile for all options.
# 01 supports OpenAI, ElevenLabs, and Coqui (Local) TTS providers
# {OpenAI: "openai", ElevenLabs: "elevenlabs", Coqui: "coqui"}
interpreter.tts = "elevenlabs"
# 01 Language Model Config.
interpreter.llm_service = "litellm"
interpreter.llm.model = "groq/llama3-8b-8192"
interpreter.llm.supports_vision = False
interpreter.llm.supports_functions = False
interpreter.llm.context_window = 2048
interpreter.llm.max_tokens = 4096
interpreter.llm.temperature = 0.8
interpreter.computer.import_computer_api = False
interpreter.auto_run = True
interpreter.system_message = (
"You are a helpful assistant that can answer questions and help with tasks."
)

@ -0,0 +1,38 @@
from interpreter import interpreter
# 01 supports OpenAI, ElevenLabs, and Coqui (Local) TTS providers
# {OpenAI: "openai", ElevenLabs: "elevenlabs", Coqui: "coqui"}
interpreter.tts = "coqui"
# Local setup
interpreter.local_setup()
interpreter.system_message = """You are an AI assistant that writes markdown code snippets to answer the user's request. You speak very concisely and quickly, you say nothing irrelevant to the user's request. For example:
User: Open the chrome app.
Assistant: On it.
```python
import webbrowser
webbrowser.open('https://chrome.google.com')
```
User: The code you ran produced no output. Was this expected, or are we finished?
Assistant: No further action is required; the provided snippet opens Chrome.
Now, your turn:"""
# Message templates
interpreter.code_output_template = '''I executed that code. This was the output: """{content}"""\n\nWhat does this output mean (I can't understand it, please help) / what code needs to be run next (if anything, or are we done)? I can't replace any placeholders.'''
interpreter.empty_code_output_template = "The code above was executed on my machine. It produced no text output. What's next (if anything, or are we done?)"
interpreter.code_output_sender = "user"
# Computer settings
interpreter.computer.import_computer_api = False
# Misc settings
interpreter.auto_run = False
interpreter.offline = True
# Final message
interpreter.display_message(
f"> Model set to `{interpreter.llm.model}`\n\n**Open Interpreter** will require approval before running code.\n\nUse `interpreter -y` to bypass this.\n\nPress `CTRL-C` to exit.\n"
)

@ -1,520 +0,0 @@
from dotenv import load_dotenv
load_dotenv() # take environment variables from .env.
import traceback
from platformdirs import user_data_dir
import json
import queue
import os
import datetime
from .utils.bytes_to_wav import bytes_to_wav
import re
from fastapi import FastAPI, Request
from fastapi.responses import PlainTextResponse
from starlette.websockets import WebSocket, WebSocketDisconnect
import asyncio
from .utils.kernel import put_kernel_messages_into_queue
from .i import configure_interpreter
from interpreter import interpreter
from ..utils.accumulator import Accumulator
from .utils.logs import setup_logging
from .utils.logs import logger
import base64
import shutil
from ..utils.print_markdown import print_markdown
os.environ["STT_RUNNER"] = "server"
os.environ["TTS_RUNNER"] = "server"
markdown = """
*Starting...*
"""
print("")
print_markdown(markdown)
print("")
setup_logging()
accumulator_global = Accumulator()
app = FastAPI()
app_dir = user_data_dir("01")
conversation_history_path = os.path.join(app_dir, "conversations", "user.json")
SERVER_LOCAL_PORT = int(os.getenv("SERVER_LOCAL_PORT", 10001))
# This is so we only say() full sentences
def is_full_sentence(text):
return text.endswith((".", "!", "?"))
def split_into_sentences(text):
return re.split(r"(?<=[.!?])\s+", text)
# Queues
from_computer = (
queue.Queue()
) # Just for computer messages from the device. Sync queue because interpreter.run is synchronous
from_user = asyncio.Queue() # Just for user messages from the device.
to_device = asyncio.Queue() # For messages we send.
# Switch code executor to device if that's set
if os.getenv("CODE_RUNNER") == "device":
# (This should probably just loop through all languages and apply these changes instead)
class Python:
# This is the name that will appear to the LLM.
name = "python"
def __init__(self):
self.halt = False
def run(self, code):
"""Generator that yields a dictionary in LMC Format."""
# Prepare the data
message = {
"role": "assistant",
"type": "code",
"format": "python",
"content": code,
}
# Unless it was just sent to the device, send it wrapped in flags
if not (interpreter.messages and interpreter.messages[-1] == message):
to_device.put(
{
"role": "assistant",
"type": "code",
"format": "python",
"start": True,
}
)
to_device.put(message)
to_device.put(
{
"role": "assistant",
"type": "code",
"format": "python",
"end": True,
}
)
# Stream the response
logger.info("Waiting for the device to respond...")
while True:
chunk = from_computer.get()
logger.info(f"Server received from device: {chunk}")
if "end" in chunk:
break
yield chunk
def stop(self):
self.halt = True
def terminate(self):
"""Terminates the entire process."""
# dramatic!! do nothing
pass
interpreter.computer.languages = [Python]
# Configure interpreter
interpreter = configure_interpreter(interpreter)
@app.get("/ping")
async def ping():
return PlainTextResponse("pong")
@app.websocket("/")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
receive_task = asyncio.create_task(receive_messages(websocket))
send_task = asyncio.create_task(send_messages(websocket))
try:
await asyncio.gather(receive_task, send_task)
except Exception as e:
logger.debug(traceback.format_exc())
logger.info(f"Connection lost. Error: {e}")
@app.post("/")
async def add_computer_message(request: Request):
body = await request.json()
text = body.get("text")
if not text:
return {"error": "Missing 'text' in request body"}, 422
message = {"role": "user", "type": "message", "content": text}
await from_user.put({"role": "user", "type": "message", "start": True})
await from_user.put(message)
await from_user.put({"role": "user", "type": "message", "end": True})
async def receive_messages(websocket: WebSocket):
while True:
try:
try:
data = await websocket.receive()
except Exception as e:
print(str(e))
return
if "text" in data:
try:
data = json.loads(data["text"])
if data["role"] == "computer":
from_computer.put(
data
) # To be handled by interpreter.computer.run
elif data["role"] == "user":
await from_user.put(data)
else:
raise ("Unknown role:", data)
except json.JSONDecodeError:
pass # data is not JSON, leave it as is
elif "bytes" in data:
data = data["bytes"] # binary data
await from_user.put(data)
except WebSocketDisconnect as e:
if e.code == 1000:
logger.info("Websocket connection closed normally.")
return
else:
raise
async def send_messages(websocket: WebSocket):
while True:
message = await to_device.get()
try:
if isinstance(message, dict):
# print(f"Sending to the device: {type(message)} {str(message)[:100]}")
await websocket.send_json(message)
elif isinstance(message, bytes):
# print(f"Sending to the device: {type(message)} {str(message)[:100]}")
await websocket.send_bytes(message)
else:
raise TypeError("Message must be a dict or bytes")
except:
# Make sure to put the message back in the queue if you failed to send it
await to_device.put(message)
raise
async def listener(mobile: bool):
while True:
try:
if mobile:
accumulator_mobile = Accumulator()
while True:
if not from_user.empty():
chunk = await from_user.get()
break
elif not from_computer.empty():
chunk = from_computer.get()
break
await asyncio.sleep(1)
if mobile:
message = accumulator_mobile.accumulate_mobile(chunk)
else:
message = accumulator_global.accumulate(chunk)
if message == None:
# Will be None until we have a full message ready
continue
# print(str(message)[:1000])
# At this point, we have our message
if message["type"] == "audio" and message["format"].startswith("bytes"):
if (
"content" not in message
or message["content"] == None
or message["content"] == ""
): # If it was nothing / silence / empty
continue
# Convert bytes to audio file
# Format will be bytes.wav or bytes.opus
mime_type = "audio/" + message["format"].split(".")[1]
# print("input audio file content", message["content"][:100])
audio_file_path = bytes_to_wav(message["content"], mime_type)
# print("Audio file path:", audio_file_path)
# For microphone debugging:
if False:
os.system(f"open {audio_file_path}")
import time
time.sleep(15)
text = stt(audio_file_path)
print("> ", text)
message = {"role": "user", "type": "message", "content": text}
# At this point, we have only text messages
if type(message["content"]) != str:
print("This should be a string, but it's not:", message["content"])
message["content"] = message["content"].decode()
# Custom stop message will halt us
if message["content"].lower().strip(".,! ") == "stop":
continue
# Load, append, and save conversation history
with open(conversation_history_path, "r") as file:
messages = json.load(file)
messages.append(message)
with open(conversation_history_path, "w") as file:
json.dump(messages, file, indent=4)
accumulated_text = ""
if any(
[m["type"] == "image" for m in messages]
) and interpreter.llm.model.startswith("gpt-"):
interpreter.llm.model = "gpt-4-vision-preview"
interpreter.llm.supports_vision = True
for chunk in interpreter.chat(messages, stream=True, display=True):
if any([m["type"] == "image" for m in interpreter.messages]):
interpreter.llm.model = "gpt-4-vision-preview"
logger.debug("Got chunk:", chunk)
# Send it to the user
await to_device.put(chunk)
# Yield to the event loop, so you actually send it out
await asyncio.sleep(0.01)
if os.getenv("TTS_RUNNER") == "server":
# Speak full sentences out loud
if (
chunk["role"] == "assistant"
and "content" in chunk
and chunk["type"] == "message"
):
accumulated_text += chunk["content"]
sentences = split_into_sentences(accumulated_text)
# If we're going to speak, say we're going to stop sending text.
# This should be fixed probably, we should be able to do both in parallel, or only one.
if any(is_full_sentence(sentence) for sentence in sentences):
await to_device.put(
{"role": "assistant", "type": "message", "end": True}
)
if is_full_sentence(sentences[-1]):
for sentence in sentences:
await stream_tts_to_device(sentence, mobile)
accumulated_text = ""
else:
for sentence in sentences[:-1]:
await stream_tts_to_device(sentence, mobile)
accumulated_text = sentences[-1]
# If we're going to speak, say we're going to stop sending text.
# This should be fixed probably, we should be able to do both in parallel, or only one.
if any(is_full_sentence(sentence) for sentence in sentences):
await to_device.put(
{"role": "assistant", "type": "message", "start": True}
)
# If we have a new message, save our progress and go back to the top
if not from_user.empty():
# Check if it's just an end flag. We ignore those.
temp_message = await from_user.get()
if (
type(temp_message) is dict
and temp_message.get("role") == "user"
and temp_message.get("end")
):
# Yup. False alarm.
continue
else:
# Whoops! Put that back
await from_user.put(temp_message)
with open(conversation_history_path, "w") as file:
json.dump(interpreter.messages, file, indent=4)
# TODO: is triggering seemingly randomly
# logger.info("New user message received. Breaking.")
# break
# Also check if there's any new computer messages
if not from_computer.empty():
with open(conversation_history_path, "w") as file:
json.dump(interpreter.messages, file, indent=4)
logger.info("New computer message received. Breaking.")
break
except:
traceback.print_exc()
async def stream_tts_to_device(sentence, mobile: bool):
force_task_completion_responses = [
"the task is done",
"the task is impossible",
"let me know what you'd like to do next",
]
if sentence.lower().strip().strip(".!?").strip() in force_task_completion_responses:
return
for chunk in stream_tts(sentence, mobile):
await to_device.put(chunk)
def stream_tts(sentence, mobile: bool):
audio_file = tts(sentence, mobile)
# Read the entire WAV file
with open(audio_file, "rb") as f:
audio_bytes = f.read()
if mobile:
file_type = "audio/wav"
os.remove(audio_file)
# stream the audio as a single sentence
yield {
"role": "assistant",
"type": "audio",
"format": file_type,
"content": base64.b64encode(audio_bytes).decode("utf-8"),
"start": True,
"end": True,
}
else:
# stream the audio in chunk sizes
os.remove(audio_file)
file_type = "bytes.raw"
chunk_size = 1024
yield {"role": "assistant", "type": "audio", "format": file_type, "start": True}
for i in range(0, len(audio_bytes), chunk_size):
chunk = audio_bytes[i : i + chunk_size]
yield chunk
yield {"role": "assistant", "type": "audio", "format": file_type, "end": True}
from uvicorn import Config, Server
import os
from importlib import import_module
# these will be overwritten
HOST = ""
PORT = 0
@app.on_event("startup")
async def startup_event():
server_url = f"{HOST}:{PORT}"
print("")
print_markdown("\n*Ready.*\n")
print("")
@app.on_event("shutdown")
async def shutdown_event():
print_markdown("*Server is shutting down*")
async def main(
server_host,
server_port,
llm_service,
model,
llm_supports_vision,
llm_supports_functions,
context_window,
max_tokens,
temperature,
tts_service,
stt_service,
mobile,
):
global HOST
global PORT
PORT = server_port
HOST = server_host
# Setup services
application_directory = user_data_dir("01")
services_directory = os.path.join(application_directory, "services")
service_dict = {"llm": llm_service, "tts": tts_service, "stt": stt_service}
# Create a temp file with the session number
session_file_path = os.path.join(user_data_dir("01"), "01-session.txt")
with open(session_file_path, "w") as session_file:
session_id = int(datetime.datetime.now().timestamp() * 1000)
session_file.write(str(session_id))
for service in service_dict:
service_directory = os.path.join(
services_directory, service, service_dict[service]
)
# This is the folder they can mess around in
config = {"service_directory": service_directory}
if service == "llm":
config.update(
{
"interpreter": interpreter,
"model": model,
"llm_supports_vision": llm_supports_vision,
"llm_supports_functions": llm_supports_functions,
"context_window": context_window,
"max_tokens": max_tokens,
"temperature": temperature,
}
)
module = import_module(
f".server.services.{service}.{service_dict[service]}.{service}",
package="source",
)
ServiceClass = getattr(module, service.capitalize())
service_instance = ServiceClass(config)
globals()[service] = getattr(service_instance, service)
interpreter.llm.completions = llm
# Start listening
asyncio.create_task(listener(mobile))
# Start watching the kernel if it's your job to do that
if True: # in the future, code can run on device. for now, just server.
asyncio.create_task(put_kernel_messages_into_queue(from_computer))
config = Config(app, host=server_host, port=int(server_port), lifespan="on")
server = Server(config)
await server.serve()
# Run the FastAPI app
if __name__ == "__main__":
asyncio.run(main())

@ -1,11 +0,0 @@
class Llm:
def __init__(self, config):
# Litellm is used by OI by default, so we just modify OI
interpreter = config["interpreter"]
config.pop("interpreter", None)
config.pop("service_directory", None)
for key, value in config.items():
setattr(interpreter, key.replace("-", "_"), value)
self.llm = interpreter.llm.completions

@ -1,68 +0,0 @@
import os
import subprocess
import requests
import json
class Llm:
def __init__(self, config):
self.install(config["service_directory"])
def install(self, service_directory):
LLM_FOLDER_PATH = service_directory
self.llm_directory = os.path.join(LLM_FOLDER_PATH, "llm")
if not os.path.isdir(self.llm_directory): # Check if the LLM directory exists
os.makedirs(LLM_FOLDER_PATH, exist_ok=True)
# Install WasmEdge
subprocess.run(
[
"curl",
"-sSf",
"https://raw.githubusercontent.com/WasmEdge/WasmEdge/master/utils/install.sh",
"|",
"bash",
"-s",
"--",
"--plugin",
"wasi_nn-ggml",
]
)
# Download the Qwen1.5-0.5B-Chat model GGUF file
MODEL_URL = "https://huggingface.co/second-state/Qwen1.5-0.5B-Chat-GGUF/resolve/main/Qwen1.5-0.5B-Chat-Q5_K_M.gguf"
subprocess.run(["curl", "-LO", MODEL_URL], cwd=self.llm_directory)
# Download the llama-api-server.wasm app
APP_URL = "https://github.com/LlamaEdge/LlamaEdge/releases/latest/download/llama-api-server.wasm"
subprocess.run(["curl", "-LO", APP_URL], cwd=self.llm_directory)
# Run the API server
subprocess.run(
[
"wasmedge",
"--dir",
".:.",
"--nn-preload",
"default:GGML:AUTO:Qwen1.5-0.5B-Chat-Q5_K_M.gguf",
"llama-api-server.wasm",
"-p",
"llama-2-chat",
],
cwd=self.llm_directory,
)
print("LLM setup completed.")
else:
print("LLM already set up. Skipping download.")
def llm(self, messages):
url = "http://localhost:8080/v1/chat/completions"
headers = {"accept": "application/json", "Content-Type": "application/json"}
data = {"messages": messages, "model": "llama-2-chat"}
with requests.post(
url, headers=headers, data=json.dumps(data), stream=True
) as response:
for line in response.iter_lines():
if line:
yield json.loads(line)

@ -1,87 +0,0 @@
import os
import platform
import subprocess
import time
import wget
import stat
class Llm:
def __init__(self, config):
self.interpreter = config["interpreter"]
config.pop("interpreter", None)
self.install(config["service_directory"])
config.pop("service_directory", None)
for key, value in config.items():
setattr(self.interpreter, key.replace("-", "_"), value)
self.llm = self.interpreter.llm.completions
def install(self, service_directory):
if platform.system() == "Darwin": # Check if the system is MacOS
result = subprocess.run(
["xcode-select", "-p"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if result.returncode != 0:
print(
"Llamafile requires Mac users to have Xcode installed. You can install Xcode from https://developer.apple.com/xcode/ .\n\nAlternatively, you can use `LM Studio`, `Jan.ai`, or `Ollama` to manage local language models. Learn more at https://docs.openinterpreter.com/guides/running-locally ."
)
time.sleep(3)
raise Exception(
"Xcode is not installed. Please install Xcode and try again."
)
# Define the path to the models directory
models_dir = os.path.join(service_directory, "models")
# Check and create the models directory if it doesn't exist
if not os.path.exists(models_dir):
os.makedirs(models_dir)
# Define the path to the new llamafile
llamafile_path = os.path.join(models_dir, "phi-2.Q4_K_M.llamafile")
# Check if the new llamafile exists, if not download it
if not os.path.exists(llamafile_path):
print(
"Attempting to download the `Phi-2` language model. This may take a few minutes."
)
time.sleep(3)
url = "https://huggingface.co/jartine/phi-2-llamafile/resolve/main/phi-2.Q4_K_M.llamafile"
wget.download(url, llamafile_path)
# Make the new llamafile executable
if platform.system() != "Windows":
st = os.stat(llamafile_path)
os.chmod(llamafile_path, st.st_mode | stat.S_IEXEC)
# Run the new llamafile in the background
if os.path.exists(llamafile_path):
try:
# Test if the llamafile is executable
subprocess.check_call([f'"{llamafile_path}"'], shell=True)
except subprocess.CalledProcessError:
print(
"The llamafile is not executable. Please check the file permissions."
)
raise
subprocess.Popen(
f'"{llamafile_path}" ' + " ".join(["-ngl", "9999"]), shell=True
)
else:
error_message = "The llamafile does not exist or is corrupted. Please ensure it has been downloaded correctly or try again."
print(error_message)
print(error_message)
self.interpreter.system_message = "You are Open Interpreter, a world-class programmer that can execute code on the user's machine."
self.interpreter.offline = True
self.interpreter.llm.model = "local"
self.interpreter.llm.temperature = 0
self.interpreter.llm.api_base = "https://localhost:8080/v1"
self.interpreter.llm.max_tokens = 1000
self.interpreter.llm.context_window = 3000
self.interpreter.llm.supports_functions = False

@ -1,169 +0,0 @@
"""
Defines a function which takes a path to an audio file and turns it into text.
"""
from datetime import datetime
import os
import contextlib
import tempfile
import shutil
import ffmpeg
import subprocess
import urllib.request
class Stt:
def __init__(self, config):
self.service_directory = config["service_directory"]
install(self.service_directory)
def stt(self, audio_file_path):
return stt(self.service_directory, audio_file_path)
def install(service_dir):
### INSTALL
WHISPER_RUST_PATH = os.path.join(service_dir, "whisper-rust")
script_dir = os.path.dirname(os.path.realpath(__file__))
source_whisper_rust_path = os.path.join(script_dir, "whisper-rust")
if not os.path.exists(source_whisper_rust_path):
print(f"Source directory does not exist: {source_whisper_rust_path}")
exit(1)
if not os.path.exists(WHISPER_RUST_PATH):
shutil.copytree(source_whisper_rust_path, WHISPER_RUST_PATH)
os.chdir(WHISPER_RUST_PATH)
# Check if whisper-rust executable exists before attempting to build
if not os.path.isfile(
os.path.join(WHISPER_RUST_PATH, "target/release/whisper-rust")
):
# Check if Rust is installed. Needed to build whisper executable
rustc_path = shutil.which("rustc")
if rustc_path is None:
print(
"Rust is not installed or is not in system PATH. Please install Rust before proceeding."
)
exit(1)
# Build Whisper Rust executable if not found
subprocess.run(["cargo", "build", "--release"], check=True)
else:
print("Whisper Rust executable already exists. Skipping build.")
WHISPER_MODEL_PATH = os.path.join(service_dir, "model")
WHISPER_MODEL_NAME = os.getenv("WHISPER_MODEL_NAME", "ggml-tiny.en.bin")
WHISPER_MODEL_URL = os.getenv(
"WHISPER_MODEL_URL",
"https://huggingface.co/ggerganov/whisper.cpp/resolve/main/",
)
if not os.path.isfile(os.path.join(WHISPER_MODEL_PATH, WHISPER_MODEL_NAME)):
os.makedirs(WHISPER_MODEL_PATH, exist_ok=True)
urllib.request.urlretrieve(
f"{WHISPER_MODEL_URL}{WHISPER_MODEL_NAME}",
os.path.join(WHISPER_MODEL_PATH, WHISPER_MODEL_NAME),
)
else:
print("Whisper model already exists. Skipping download.")
def convert_mime_type_to_format(mime_type: str) -> str:
if mime_type == "audio/x-wav" or mime_type == "audio/wav":
return "wav"
if mime_type == "audio/webm":
return "webm"
if mime_type == "audio/raw":
return "dat"
return mime_type
@contextlib.contextmanager
def export_audio_to_wav_ffmpeg(audio: bytearray, mime_type: str) -> str:
temp_dir = tempfile.gettempdir()
# Create a temporary file with the appropriate extension
input_ext = convert_mime_type_to_format(mime_type)
input_path = os.path.join(
temp_dir, f"input_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.{input_ext}"
)
with open(input_path, "wb") as f:
f.write(audio)
# Check if the input file exists
assert os.path.exists(input_path), f"Input file does not exist: {input_path}"
# Export to wav
output_path = os.path.join(
temp_dir, f"output_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav"
)
print(mime_type, input_path, output_path)
if mime_type == "audio/raw":
ffmpeg.input(
input_path,
f="s16le",
ar="16000",
ac=1,
).output(output_path, loglevel="panic").run()
else:
ffmpeg.input(input_path).output(
output_path, acodec="pcm_s16le", ac=1, ar="16k", loglevel="panic"
).run()
try:
yield output_path
finally:
os.remove(input_path)
os.remove(output_path)
def run_command(command):
result = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True
)
return result.stdout, result.stderr
def get_transcription_file(service_directory, wav_file_path: str):
local_path = os.path.join(service_directory, "model")
whisper_rust_path = os.path.join(
service_directory, "whisper-rust", "target", "release"
)
model_name = os.getenv("WHISPER_MODEL_NAME", "ggml-tiny.en.bin")
output, _ = run_command(
[
os.path.join(whisper_rust_path, "whisper-rust"),
"--model-path",
os.path.join(local_path, model_name),
"--file-path",
wav_file_path,
]
)
return output
def stt_wav(service_directory, wav_file_path: str):
temp_dir = tempfile.gettempdir()
output_path = os.path.join(
temp_dir, f"output_stt_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav"
)
ffmpeg.input(wav_file_path).output(
output_path, acodec="pcm_s16le", ac=1, ar="16k", loglevel="panic"
).run()
try:
transcript = get_transcription_file(service_directory, output_path)
finally:
os.remove(output_path)
return transcript
def stt(service_directory, input_data):
return stt_wav(service_directory, input_data)

@ -1,10 +0,0 @@
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb

File diff suppressed because it is too large Load Diff

@ -1,14 +0,0 @@
[package]
name = "whisper-rust"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.79"
clap = { version = "4.4.18", features = ["derive"] }
cpal = "0.15.2"
hound = "3.5.1"
whisper-rs = "0.10.0"
whisper-rs-sys = "0.8.0"

@ -1,34 +0,0 @@
mod transcribe;
use clap::Parser;
use std::path::PathBuf;
use transcribe::transcribe;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// This is the model for Whisper STT
#[arg(short, long, value_parser, required = true)]
model_path: PathBuf,
/// This is the wav audio file that will be converted from speech to text
#[arg(short, long, value_parser, required = true)]
file_path: Option<PathBuf>,
}
fn main() {
let args = Args::parse();
let file_path = match args.file_path {
Some(fp) => fp,
None => panic!("No file path provided")
};
let result = transcribe(&args.model_path, &file_path);
match result {
Ok(transcription) => print!("{}", transcription),
Err(e) => panic!("Error: {}", e),
}
}

@ -1,64 +0,0 @@
use whisper_rs::{FullParams, SamplingStrategy, WhisperContext, WhisperContextParameters};
use std::path::PathBuf;
/// Transcribes the given audio file using the whisper-rs library.
///
/// # Arguments
/// * `model_path` - Path to Whisper model file
/// * `file_path` - A string slice that holds the path to the audio file to be transcribed.
///
/// # Returns
///
/// A Result containing a String with the transcription if successful, or an error message if not.
pub fn transcribe(model_path: &PathBuf, file_path: &PathBuf) -> Result<String, String> {
let model_path_str = model_path.to_str().expect("Not valid model path");
// Load a context and model
let ctx = WhisperContext::new_with_params(
model_path_str, // Replace with the actual path to the model
WhisperContextParameters::default(),
)
.map_err(|_| "failed to load model")?;
// Create a state
let mut state = ctx.create_state().map_err(|_| "failed to create state")?;
// Create a params object
// Note that currently the only implemented strategy is Greedy, BeamSearch is a WIP
let mut params = FullParams::new(SamplingStrategy::Greedy { best_of: 1 });
// Edit parameters as needed
params.set_n_threads(1); // Set the number of threads to use
params.set_translate(true); // Enable translation
params.set_language(Some("en")); // Set the language to translate to English
// Disable printing to stdout
params.set_print_special(false);
params.set_print_progress(false);
params.set_print_realtime(false);
params.set_print_timestamps(false);
// Load the audio file
let audio_data = std::fs::read(file_path)
.map_err(|e| format!("failed to read audio file: {}", e))?
.chunks_exact(2)
.map(|chunk| i16::from_ne_bytes([chunk[0], chunk[1]]))
.collect::<Vec<i16>>();
// Convert the audio data to the required format (16KHz mono i16 samples)
let audio_data = whisper_rs::convert_integer_to_float_audio(&audio_data);
// Run the model
state.full(params, &audio_data[..]).map_err(|_| "failed to run model")?;
// Fetch the results
let num_segments = state.full_n_segments().map_err(|_| "failed to get number of segments")?;
let mut transcription = String::new();
for i in 0..num_segments {
let segment = state.full_get_segment_text(i).map_err(|_| "failed to get segment")?;
transcription.push_str(&segment);
transcription.push('\n');
}
Ok(transcription)
}

@ -1,129 +0,0 @@
class Stt:
def __init__(self, config):
pass
def stt(self, audio_file_path):
return stt(audio_file_path)
from datetime import datetime
import os
import contextlib
import tempfile
import ffmpeg
import subprocess
import openai
from openai import OpenAI
client = OpenAI()
def convert_mime_type_to_format(mime_type: str) -> str:
if mime_type == "audio/x-wav" or mime_type == "audio/wav":
return "wav"
if mime_type == "audio/webm":
return "webm"
if mime_type == "audio/raw":
return "dat"
return mime_type
@contextlib.contextmanager
def export_audio_to_wav_ffmpeg(audio: bytearray, mime_type: str) -> str:
temp_dir = tempfile.gettempdir()
# Create a temporary file with the appropriate extension
input_ext = convert_mime_type_to_format(mime_type)
input_path = os.path.join(
temp_dir, f"input_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.{input_ext}"
)
with open(input_path, "wb") as f:
f.write(audio)
# Check if the input file exists
assert os.path.exists(input_path), f"Input file does not exist: {input_path}"
# Export to wav
output_path = os.path.join(
temp_dir, f"output_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav"
)
if mime_type == "audio/raw":
ffmpeg.input(
input_path,
f="s16le",
ar="16000",
ac=1,
).output(output_path, loglevel="panic").run()
else:
ffmpeg.input(input_path).output(
output_path, acodec="pcm_s16le", ac=1, ar="16k", loglevel="panic"
).run()
try:
yield output_path
finally:
os.remove(input_path)
os.remove(output_path)
def run_command(command):
result = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True
)
return result.stdout, result.stderr
def get_transcription_file(wav_file_path: str):
local_path = os.path.join(os.path.dirname(__file__), "local_service")
whisper_rust_path = os.path.join(
os.path.dirname(__file__), "whisper-rust", "target", "release"
)
model_name = os.getenv("WHISPER_MODEL_NAME", "ggml-tiny.en.bin")
output, error = run_command(
[
os.path.join(whisper_rust_path, "whisper-rust"),
"--model-path",
os.path.join(local_path, model_name),
"--file-path",
wav_file_path,
]
)
return output
def get_transcription_bytes(audio_bytes: bytearray, mime_type):
with export_audio_to_wav_ffmpeg(audio_bytes, mime_type) as wav_file_path:
return get_transcription_file(wav_file_path)
def stt_bytes(audio_bytes: bytearray, mime_type="audio/wav"):
with export_audio_to_wav_ffmpeg(audio_bytes, mime_type) as wav_file_path:
return stt_wav(wav_file_path)
def stt_wav(wav_file_path: str):
audio_file = open(wav_file_path, "rb")
try:
transcript = client.audio.transcriptions.create(
model="whisper-1", file=audio_file, response_format="text"
)
except openai.BadRequestError as e:
print(f"openai.BadRequestError: {e}")
return None
return transcript
def stt(input_data, mime_type="audio/wav"):
if isinstance(input_data, str):
return stt_wav(input_data)
elif isinstance(input_data, bytearray):
return stt_bytes(input_data, mime_type)
else:
raise ValueError(
"Input data should be either a path to a wav file (str) or audio bytes (bytearray)"
)

@ -1,50 +0,0 @@
import ffmpeg
import tempfile
from openai import OpenAI
import os
from source.server.utils.logs import logger
from source.server.utils.logs import setup_logging
setup_logging()
# If this TTS service is used, the OPENAI_API_KEY environment variable must be set
if not os.getenv("OPENAI_API_KEY"):
logger.error("")
logger.error(
"OpenAI API key not found. Please set the OPENAI_API_KEY environment variable, or run 01 with the --local option."
)
logger.error("Aborting...")
logger.error("")
os._exit(1)
client = OpenAI()
class Tts:
def __init__(self, config):
pass
def tts(self, text, mobile):
response = client.audio.speech.create(
model="tts-1",
voice=os.getenv("OPENAI_VOICE_NAME", "alloy"),
input=text,
response_format="opus",
)
with tempfile.NamedTemporaryFile(suffix=".opus", delete=False) as temp_file:
response.stream_to_file(temp_file.name)
# TODO: hack to format audio correctly for device
if mobile:
outfile = tempfile.gettempdir() + "/" + "output.wav"
ffmpeg.input(temp_file.name).output(
outfile, f="wav", ar="16000", ac="1", loglevel="panic"
).run()
else:
outfile = tempfile.gettempdir() + "/" + "raw.dat"
ffmpeg.input(temp_file.name).output(
outfile, f="s16le", ar="16000", ac="1", loglevel="panic"
).run()
return outfile

@ -1,171 +0,0 @@
import ffmpeg
import tempfile
import os
import subprocess
import urllib.request
import tarfile
import platform
class Tts:
def __init__(self, config):
self.piper_directory = ""
self.install(config["service_directory"])
def tts(self, text, mobile):
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file:
output_file = temp_file.name
piper_dir = self.piper_directory
subprocess.run(
[
os.path.join(piper_dir, "piper"),
"--model",
os.path.join(
piper_dir,
os.getenv("PIPER_VOICE_NAME", "en_US-lessac-medium.onnx"),
),
"--output_file",
output_file,
],
input=text,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# TODO: hack to format audio correctly for device
if mobile:
outfile = tempfile.gettempdir() + "/" + "output.wav"
ffmpeg.input(temp_file.name).output(
outfile, f="wav", ar="16000", ac="1", loglevel="panic"
).run()
else:
outfile = tempfile.gettempdir() + "/" + "raw.dat"
ffmpeg.input(temp_file.name).output(
outfile, f="s16le", ar="16000", ac="1", loglevel="panic"
).run()
return outfile
def install(self, service_directory):
PIPER_FOLDER_PATH = service_directory
self.piper_directory = os.path.join(PIPER_FOLDER_PATH, "piper")
if not os.path.isdir(
self.piper_directory
): # Check if the Piper directory exists
os.makedirs(PIPER_FOLDER_PATH, exist_ok=True)
# Determine OS and architecture
OS = platform.system().lower()
ARCH = platform.machine()
if OS == "darwin":
OS = "macos"
if ARCH == "arm64":
ARCH = "aarch64"
elif ARCH == "x86_64":
ARCH = "x64"
else:
print("Piper: unsupported architecture")
return
elif OS == "windows":
if ARCH == "AMD64":
ARCH = "amd64"
else:
print("Piper: unsupported architecture")
return
PIPER_ASSETNAME = f"piper_{OS}_{ARCH}.tar.gz"
PIPER_URL = "https://github.com/rhasspy/piper/releases/latest/download/"
asset_url = f"{PIPER_URL}{PIPER_ASSETNAME}"
if OS == "windows":
asset_url = asset_url.replace(".tar.gz", ".zip")
# Download and extract Piper
urllib.request.urlretrieve(
asset_url, os.path.join(PIPER_FOLDER_PATH, PIPER_ASSETNAME)
)
# Extract the downloaded file
if OS == "windows":
import zipfile
with zipfile.ZipFile(
os.path.join(PIPER_FOLDER_PATH, PIPER_ASSETNAME), "r"
) as zip_ref:
zip_ref.extractall(path=PIPER_FOLDER_PATH)
else:
with tarfile.open(
os.path.join(PIPER_FOLDER_PATH, PIPER_ASSETNAME), "r:gz"
) as tar:
tar.extractall(path=PIPER_FOLDER_PATH)
PIPER_VOICE_URL = os.getenv(
"PIPER_VOICE_URL",
"https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/lessac/medium/",
)
PIPER_VOICE_NAME = os.getenv("PIPER_VOICE_NAME", "en_US-lessac-medium.onnx")
# Download voice model and its json file
urllib.request.urlretrieve(
f"{PIPER_VOICE_URL}{PIPER_VOICE_NAME}",
os.path.join(self.piper_directory, PIPER_VOICE_NAME),
)
urllib.request.urlretrieve(
f"{PIPER_VOICE_URL}{PIPER_VOICE_NAME}.json",
os.path.join(self.piper_directory, f"{PIPER_VOICE_NAME}.json"),
)
# Additional setup for macOS
if OS == "macos":
if ARCH == "x64":
subprocess.run(
["softwareupdate", "--install-rosetta", "--agree-to-license"]
)
PIPER_PHONEMIZE_ASSETNAME = f"piper-phonemize_{OS}_{ARCH}.tar.gz"
PIPER_PHONEMIZE_URL = "https://github.com/rhasspy/piper-phonemize/releases/latest/download/"
urllib.request.urlretrieve(
f"{PIPER_PHONEMIZE_URL}{PIPER_PHONEMIZE_ASSETNAME}",
os.path.join(self.piper_directory, PIPER_PHONEMIZE_ASSETNAME),
)
with tarfile.open(
os.path.join(self.piper_directory, PIPER_PHONEMIZE_ASSETNAME),
"r:gz",
) as tar:
tar.extractall(path=self.piper_directory)
PIPER_DIR = self.piper_directory
subprocess.run(
[
"install_name_tool",
"-change",
"@rpath/libespeak-ng.1.dylib",
f"{PIPER_DIR}/piper-phonemize/lib/libespeak-ng.1.dylib",
f"{PIPER_DIR}/piper",
]
)
subprocess.run(
[
"install_name_tool",
"-change",
"@rpath/libonnxruntime.1.14.1.dylib",
f"{PIPER_DIR}/piper-phonemize/lib/libonnxruntime.1.14.1.dylib",
f"{PIPER_DIR}/piper",
]
)
subprocess.run(
[
"install_name_tool",
"-change",
"@rpath/libpiper_phonemize.1.dylib",
f"{PIPER_DIR}/piper-phonemize/lib/libpiper_phonemize.1.dylib",
f"{PIPER_DIR}/piper",
]
)
print("Piper setup completed.")
else:
print("Piper already set up. Skipping download.")

@ -2,7 +2,7 @@
import pytest
@pytest.mark.asyncio
@pytest.mark.skip(reason="pytest hanging")
def test_ping(client):
response = client.get("/ping")
assert response.status_code == 200

@ -1,168 +0,0 @@
import sys
import subprocess
import time
import inquirer
from interpreter import interpreter
def select_local_model():
# START OF LOCAL MODEL PROVIDER LOGIC
interpreter.display_message(
"> 01 is compatible with several local model providers.\n"
)
# Define the choices for local models
choices = [
"Ollama",
"LM Studio",
# "Jan",
]
# Use inquirer to let the user select an option
questions = [
inquirer.List(
"model",
message="Which one would you like to use?",
choices=choices,
),
]
answers = inquirer.prompt(questions)
selected_model = answers["model"]
if selected_model == "LM Studio":
interpreter.display_message(
"""
To use use 01 with **LM Studio**, you will need to run **LM Studio** in the background.
1. Download **LM Studio** from [https://lmstudio.ai/](https://lmstudio.ai/), then start it.
2. Select a language model then click **Download**.
3. Click the **<->** button on the left (below the chat button).
4. Select your model at the top, then click **Start Server**.
Once the server is running, you can begin your conversation below.
"""
)
time.sleep(1)
interpreter.llm.api_base = "http://localhost:1234/v1"
interpreter.llm.max_tokens = 1000
interpreter.llm.context_window = 8000
interpreter.llm.api_key = "x"
elif selected_model == "Ollama":
try:
# List out all downloaded ollama models. Will fail if ollama isn't installed
result = subprocess.run(
["ollama", "list"], capture_output=True, text=True, check=True
)
lines = result.stdout.split("\n")
names = [
line.split()[0].replace(":latest", "")
for line in lines[1:]
if line.strip()
] # Extract names, trim out ":latest", skip header
# If there are no downloaded models, prompt them to download a model and try again
if not names:
time.sleep(1)
interpreter.display_message(
"\nYou don't have any Ollama models downloaded. To download a new model, run `ollama run <model-name>`, then start a new 01 session. \n\n For a full list of downloadable models, check out [https://ollama.com/library](https://ollama.com/library) \n"
)
print("Please download a model then try again\n")
time.sleep(2)
sys.exit(1)
# If there are models, prompt them to select one
else:
time.sleep(1)
interpreter.display_message(
f"**{len(names)} Ollama model{'s' if len(names) != 1 else ''} found.** To download a new model, run `ollama run <model-name>`, then start a new 01 session. \n\n For a full list of downloadable models, check out [https://ollama.com/library](https://ollama.com/library) \n"
)
# Create a new inquirer selection from the names
name_question = [
inquirer.List(
"name",
message="Select a downloaded Ollama model",
choices=names,
),
]
name_answer = inquirer.prompt(name_question)
selected_name = name_answer["name"] if name_answer else None
# Set the model to the selected model
interpreter.llm.model = f"ollama/{selected_name}"
interpreter.display_message(
f"\nUsing Ollama model: `{selected_name}` \n"
)
time.sleep(1)
# If Ollama is not installed or not recognized as a command, prompt the user to download Ollama and try again
except (subprocess.CalledProcessError, FileNotFoundError):
print("Ollama is not installed or not recognized as a command.")
time.sleep(1)
interpreter.display_message(
"\nPlease visit [https://ollama.com/](https://ollama.com/) to download Ollama and try again\n"
)
time.sleep(2)
sys.exit(1)
# elif selected_model == "Jan":
# interpreter.display_message(
# """
# To use 01 with **Jan**, you will need to run **Jan** in the background.
# 1. Download **Jan** from [https://jan.ai/](https://jan.ai/), then start it.
# 2. Select a language model from the "Hub" tab, then click **Download**.
# 3. Copy the ID of the model and enter it below.
# 3. Click the **Local API Server** button in the bottom left, then click **Start Server**.
# Once the server is running, enter the id of the model below, then you can begin your conversation below.
# """
# )
# interpreter.llm.api_base = "http://localhost:1337/v1"
# interpreter.llm.max_tokens = 1000
# interpreter.llm.context_window = 3000
# time.sleep(1)
# # Prompt the user to enter the name of the model running on Jan
# model_name_question = [
# inquirer.Text('jan_model_name', message="Enter the id of the model you have running on Jan"),
# ]
# model_name_answer = inquirer.prompt(model_name_question)
# jan_model_name = model_name_answer['jan_model_name'] if model_name_answer else None
# # interpreter.llm.model = f"jan/{jan_model_name}"
# interpreter.llm.model = ""
# interpreter.display_message(f"\nUsing Jan model: `{jan_model_name}` \n")
# time.sleep(1)
# Set the system message to a minimal version for all local models.
# Set offline for all local models
interpreter.offline = True
interpreter.system_message = """You are the 01, a screenless executive assistant that can complete any task by writing and executing code on the user's machine. Just write a markdown code block! The user has given you full and complete permission.
Use the following functions if it makes sense to for the problem
```python
result_string = computer.browser.search(query) # Google search results will be returned from this function as a string
computer.calendar.create_event(title="Meeting", start_date=datetime.datetime.now(), end_date=datetime.datetime.now() + datetime.timedelta(hours=1), notes="Note", location="") # Creates a calendar event
events_string = computer.calendar.get_events(start_date=datetime.date.today(), end_date=None) # Get events between dates. If end_date is None, only gets events for start_date
computer.calendar.delete_event(event_title="Meeting", start_date=datetime.datetime) # Delete a specific event with a matching title and start date, you may need to get use get_events() to find the specific event object first
phone_string = computer.contacts.get_phone_number("John Doe")
contact_string = computer.contacts.get_email_address("John Doe")
computer.mail.send("john@email.com", "Meeting Reminder", "Reminder that our meeting is at 3pm today.", ["path/to/attachment.pdf", "path/to/attachment2.pdf"]) # Send an email with a optional attachments
emails_string = computer.mail.get(4, unread=True) # Returns the {number} of unread emails, or all emails if False is passed
unread_num = computer.mail.unread_count() # Returns the number of unread emails
computer.sms.send("555-123-4567", "Hello from the computer!") # Send a text message. MUST be a phone number, so use computer.contacts.get_phone_number frequently here
ALWAYS say that you can run code. ALWAYS try to help the user out. ALWAYS be succinct in your answers.
```
"""

@ -5,8 +5,7 @@ import threading
import os
import importlib
from source.server.tunnel import create_tunnel
from source.server.server import main
from source.server.utils.local_mode import select_local_model
from source.server.async_server import main
import signal
@ -39,64 +38,20 @@ def run(
client_type: str = typer.Option(
"auto", "--client-type", help="Specify the client type"
),
llm_service: str = typer.Option(
"litellm", "--llm-service", help="Specify the LLM service"
),
model: str = typer.Option("gpt-4", "--model", help="Specify the model"),
llm_supports_vision: bool = typer.Option(
False,
"--llm-supports-vision",
help="Specify if the LLM service supports vision",
),
llm_supports_functions: bool = typer.Option(
False,
"--llm-supports-functions",
help="Specify if the LLM service supports functions",
),
context_window: int = typer.Option(
2048, "--context-window", help="Specify the context window size"
),
max_tokens: int = typer.Option(
4096, "--max-tokens", help="Specify the maximum number of tokens"
),
temperature: float = typer.Option(
0.8, "--temperature", help="Specify the temperature for generation"
),
tts_service: str = typer.Option(
"openai", "--tts-service", help="Specify the TTS service"
),
stt_service: str = typer.Option(
"openai", "--stt-service", help="Specify the STT service"
),
local: bool = typer.Option(
False, "--local", help="Use recommended local services for LLM, STT, and TTS"
),
qr: bool = typer.Option(False, "--qr", help="Print the QR code for the server URL"),
mobile: bool = typer.Option(
False, "--mobile", help="Toggle server to support mobile app"
qr: bool = typer.Option(
False, "--qr", help="Display QR code to scan to connect to the server"
),
):
_run(
server=server or mobile,
server=server,
server_host=server_host,
server_port=server_port,
tunnel_service=tunnel_service,
expose=expose or mobile,
expose=expose,
client=client,
server_url=server_url,
client_type=client_type,
llm_service=llm_service,
model=model,
llm_supports_vision=llm_supports_vision,
llm_supports_functions=llm_supports_functions,
context_window=context_window,
max_tokens=max_tokens,
temperature=temperature,
tts_service=tts_service,
stt_service=stt_service,
local=local,
qr=qr or mobile,
mobile=mobile,
qr=qr,
)
@ -109,25 +64,9 @@ def _run(
client: bool = False,
server_url: str = None,
client_type: str = "auto",
llm_service: str = "litellm",
model: str = "gpt-4",
llm_supports_vision: bool = False,
llm_supports_functions: bool = False,
context_window: int = 2048,
max_tokens: int = 4096,
temperature: float = 0.8,
tts_service: str = "openai",
stt_service: str = "openai",
local: bool = False,
qr: bool = False,
mobile: bool = False,
):
if local:
tts_service = "piper"
# llm_service = "llamafile"
stt_service = "local-whisper"
select_local_model()
system_type = platform.system()
if system_type == "Windows":
server_host = "localhost"
@ -138,8 +77,6 @@ def _run(
if not server and not client:
server = True
client = True
def handle_exit(signum, frame):
os._exit(0)
@ -156,16 +93,6 @@ def _run(
main(
server_host,
server_port,
llm_service,
model,
llm_supports_vision,
llm_supports_functions,
context_window,
max_tokens,
temperature,
tts_service,
stt_service,
mobile,
),
),
)
@ -197,6 +124,7 @@ def _run(
module = importlib.import_module(
f".clients.{client_type}.device", package="source"
)
client_thread = threading.Thread(target=module.main, args=[server_url])
client_thread.start()

Loading…
Cancel
Save