pull/299/head
killian 5 months ago
parent feddecdcc0
commit a7f381cb36

@ -1,4 +1,4 @@
# Takeaways
# Context
1. **Be minimal.**
@ -6,7 +6,7 @@ Both to developers (the 01 should be very programmer friendly) and to the end us
2. **Develop standards.**
That should be compatible with other popular systems. For example, I think [LMC messages](https://docs.openinterpreter.com/protocols/lmc-messages) should ~ work on OpenAI's API and vice versa.
That should be compatible with other popular systems.
3. **Resonate strongly with a niche.**

@ -1,21 +1,13 @@
# ●
**01 is the world's first open-source Language Model Computer (LMC). 01OS is the operating system that powers it**
There are many ways to contribute, from helping others on [Github](https://github.com/KillianLucas/01/issues) or [Discord](https://discord.gg/Hvz9Axh84z), writing documentation, or improving code.
We depend on contributors like you. Let's build this.
## What should I work on?
Please pick up a task from our [roadmap](https://github.com/KillianLucas/01/blob/main/ROADMAP.md) or work on solving an [issue](https://github.com/KillianLucas/01/issues).
Please pick up a task from [issues](https://github.com/KillianLucas/01/issues).
If you encounter a bug or have a feature in mind, [search if an issue already exists](https://docs.github.com/en/github/searching-for-information-on-github/searching-on-github/searching-issues-and-pull-requests#search-by-the-title-body-or-comments). If a related issue doesn't exist, please [open a new issue](https://github.com/KillianLucas/01/issues/new/choose).
## Philosophy
01OS embodies a philosophy of breaking free from technological limitations and knowledge gaps by leveraging AI for intuitive, natural language interactions, democratizing access to compute through open-source flexibility and transforming devices into responsive, human-centric computing tools.
# Contribution Guidelines
1. Before taking on significant code changes, please discuss your ideas on [Discord](https://discord.gg/Hvz9Axh84z) to ensure they align with our vision. We want to keep the codebase simple and unintimidating for new users.
@ -31,31 +23,13 @@ We will review PRs when possible and work with you to integrate your contributio
Once you've forked the code and created a new branch for your work, you can run the fork by following these steps:
1. CD into the project folder `/01OS`
1. CD into the software folder `/software`
2. Install dependencies `poetry install`
3. Run the program `poetry run 01`
**Note**: This project uses [`black`](https://black.readthedocs.io/en/stable/index.html) and [`isort`](https://pypi.org/project/isort/) via a [`pre-commit`](https://pre-commit.com/) hook to ensure consistent code style. If you need to bypass it for some reason, you can `git commit` with the `--no-verify` flag.
### Installing New Dependencies
If you wish to install new dependencies into the project, please use `poetry add package-name`.
### Installing Developer Dependencies
If you need to install dependencies specific to development, like testing tools, formatting tools, etc. please use `poetry add package-name --group dev`.
### Known Issues
For some, `poetry install` might hang on some dependencies. As a first step, try to run the following command in your terminal:
`export PYTHON_KEYRING_BACKEND=keyring.backends.fail.Keyring`
Then run `poetry install` again. If this doesn't work, please join our [Discord community](https://discord.gg/Hvz9Axh84z) for help.
## Code Formatting and Linting
Our project uses `black` for code formatting and `isort` for import sorting. To ensure consistency across contributions, please adhere to the following guidelines:
Our project uses [`black`](https://black.readthedocs.io/en/stable/index.html) for code formatting and [`isort`](https://pypi.org/project/isort/) for import sorting via a [`pre-commit`](https://pre-commit.com/) hook to ensure consistent code style across contributions. Please adhere to the following guidelines:
1. **Install Pre-commit Hooks**:
@ -63,7 +37,6 @@ Our project uses `black` for code formatting and `isort` for import sorting. To
```bash
cd software # Change into `software` directory if not there already.
poetry shell # It's better to do it within the virtual environment of your project
poetry add --dev pre-commit # Install pre-commit as a dev dependency
pre-commit install
```
@ -79,6 +52,22 @@ Our project uses `black` for code formatting and `isort` for import sorting. To
isort .
```
3. **Bypassing**:
If you need to bypass this for some reason, you can `git commit` with the `--no-verify` flag.
### Installing New Dependencies
If you wish to install new dependencies into the project, please use `poetry add package-name`.
### Known Issues
For some, `poetry install` might hang on some dependencies. As a first step, try to run the following command in your terminal:
`export PYTHON_KEYRING_BACKEND=keyring.backends.fail.Keyring`
Then run `poetry install` again. If this doesn't work, please join our [Discord community](https://discord.gg/Hvz9Axh84z) for help.
# Licensing
Contributions to 01 are under AGPL.

@ -1,21 +0,0 @@
**The 01 Project** is comprised of the following goals, to be completed by _February 23rd, 2024_:
<br>
# 1. Create a blueprint
We will create a blueprint for a LMC (Language Model Computer) called the 01.
<br>
# 2. Publish a family of protocols
We will publish protocols to advance the LMC ecosystem.
<br>
# 3. Film a compelling video
This video will showcase the 01.
<br>
# 4. Build a physical device
Everyone on the core team will receive a functional device.

@ -1,7 +0,0 @@
| | |
|---|---|
| ![Image 13](https://github.com/KillianLucas/01/assets/63927363/7e7c179d-f0f7-4dd3-a3a0-6a750ba86f17) | ![Image 4](https://github.com/KillianLucas/01/assets/63927363/a920b172-179b-48ad-b21b-aa016955ee93) |
| ![Image 9](https://github.com/KillianLucas/01/assets/63927363/18c4a7d7-ce15-4597-ad90-28d0133321dd) | ![Image 8](https://github.com/KillianLucas/01/assets/63927363/d93bb4b0-dada-41c2-94aa-e156f40e4e00) |
| ![Image 7](https://github.com/KillianLucas/01/assets/63927363/cae5fa56-3016-4d5c-a2d9-2d1a0bb8ead7) | ![Image 6](https://github.com/KillianLucas/01/assets/63927363/7c502082-336b-436b-ab69-605878451592) |
| ![Image 5](https://github.com/KillianLucas/01/assets/63927363/bcaafacd-8af0-42a0-a3d5-91b1f1769311) | ![Image 10](https://github.com/KillianLucas/01/assets/63927363/9d1fc091-d19a-4b22-9866-90a0711e0f3d) |
| ![Image 3](https://github.com/KillianLucas/01/assets/63927363/51c0f95d-f8b7-4e2e-b4f4-f8beea219b88) | |

@ -4,7 +4,7 @@
<a href="https://discord.gg/Hvz9Axh84z"><img alt="Discord" src="https://img.shields.io/discord/1146610656779440188?logo=discord&style=social&logoColor=black"/></a>
<br>
<br>
<strong>The open-source language model computer.</strong><br>
<strong>The #1 open-source voice interface.</strong><br>
<br><a href="https://changes.openinterpreter.com">Get Updates</a> | <a href="https://01.openinterpreter.com/">Documentation</a><br>
</p>
@ -26,14 +26,20 @@ We want to help you build. [Apply for 1-on-1 support.](https://0ggfznkwh4j.typef
> [!IMPORTANT]
> This experimental project is under rapid development and lacks basic safeguards. Until a stable `1.0` release, only run this repository on devices without sensitive information or access to paid services.
>
> **A substantial rewrite to address these concerns and more, including the addition of [RealtimeTTS](https://github.com/KoljaB/RealtimeTTS) and [RealtimeSTT](https://github.com/KoljaB/RealtimeSTT), is occurring [here](https://github.com/KillianLucas/01-rewrite/tree/main).**
<br>
**The 01 Project** is building an open-source ecosystem for AI devices.
The **01** is an open-source platform for conversational devices, inspired by the *Star Trek* computer.
Our flagship operating system can power conversational devices like the Rabbit R1, Humane Pin, or [Star Trek computer](https://www.youtube.com/watch?v=1ZXugicgn6U).
With [Open Interpreter](https://github.com/OpenInterpreter/open-interpreter) at its core, the **01** is more natural, flexible, and capable than its predecessors. Assistants built on **01** can:
- Execute code
- Browse the web
- Read and create files
- Control third-party software
- ...
<br>
We intend to become the GNU/Linux of this space by staying open, modular, and free.
@ -59,7 +65,7 @@ poetry run 01 # Runs the 01 Light simulator (hold your spacebar, speak, release)
<br>
**The [RealtimeTTS](https://github.com/KoljaB/RealtimeTTS) and [RealtimeSTT](https://github.com/KoljaB/RealtimeSTT) libraries in the incoming 01-rewrite are thanks to the state-of-the-art voice interface work of [Kolja Beigel](https://github.com/KoljaB). Please star those repos and consider contributing to / utilizing those projects!**
**Note:** The [RealtimeTTS](https://github.com/KoljaB/RealtimeTTS) and [RealtimeSTT](https://github.com/KoljaB/RealtimeSTT) libraries at the heart of the 01 are thanks to the voice interface work of [Kolja Beigel](https://github.com/KoljaB). Please star those repos and consider contributing to / utilizing those projects.
# Hardware
@ -73,7 +79,7 @@ poetry run 01 # Runs the 01 Light simulator (hold your spacebar, speak, release)
# What does it do?
The 01 exposes a speech-to-speech websocket at `localhost:10001`.
The 01 exposes a speech-to-speech websocket at `localhost:10101`.
If you stream raw audio bytes to `/` in [Streaming LMC format](https://docs.openinterpreter.com/guides/streaming-response), you will receive its response in the same format.
@ -112,7 +118,7 @@ To run the server on your Desktop and connect it to your 01 Light, run the follo
```shell
brew install ngrok/ngrok/ngrok
ngrok authtoken ... # Use your ngrok authtoken
poetry run 01 --server --expose
poetry run 01 --server light --expose
```
The final command will print a server URL. You can enter this into your 01 Light's captive WiFi portal to connect to your 01 Server.
@ -120,11 +126,9 @@ The final command will print a server URL. You can enter this into your 01 Light
## Local Mode
```
poetry run 01 --local
poetry run 01 --profile local.py
```
If you want to run local speech-to-text using Whisper, you must install Rust. Follow the instructions given [here](https://www.rust-lang.org/tools/install).
## Customizations
To customize the behavior of the system, edit the [system message, model, skills library path,](https://docs.openinterpreter.com/settings/all-settings) etc. in the `profiles` directory under the `server` directory. This file sets up an interpreter, and is powered by Open Interpreter.
@ -157,10 +161,6 @@ Visit [our roadmap](/ROADMAP.md) to see the future of the 01.
The story of devices that came before the 01.
### [Inspiration ↗](https://github.com/KillianLucas/01/tree/main/INSPIRATION.md)
Things we want to steal great ideas from.
<br>

@ -1,12 +1,13 @@
# Roadmap
Our goal is to power a billion devices with the 01OS over the next 10 years. The Cambrian explosion of AI devices.
Our goal is to power a billion devices with the 01's software over the next 10 years.
We can do that with your help. Help extend the 01OS to run on new hardware, to connect with new peripherals like GPS and cameras, and add new locally running language models to unlock use-cases for this technology that no-one has even imagined yet.
We can do that with your help. Help extend the 01 to run on new hardware, to connect with new peripherals like GPS and cameras, and to add new locally running language models to unlock use-cases for this technology that no-one has imagined.
In the coming months, we're going to release:
- [ ] Add support for Azure and PlayHT for fast latency
- [ ] An open-source language model for computer control
- [ ] A react-native app for your phone
- [ ] A hand-held device that runs fully offline.
- [ ] Speech-to-speech model support (like `gpt-4o`) instead of TTS/STT
- [ ] Implement `Ultravox`
- [ ] An open-source language model for computer control
- [ ] A hand-held device that runs fully offline.

@ -1,92 +0,0 @@
**OI**
- [ ] Finish skill library.
- [ ] Create a system message that includes instructions on how to use the skill library.
- [ ] Test it end-to-end.
- [ ] Make sure it works with computer.skills.search (it should already work)
- [ ] Create computer.skills.teach()
- [ ] Displays a tkinter message asking users to complete the task via text (eventually voice) in the most generalizable way possible. OI should use computer.mouse and computer.keyboard to fulfill each step, then save the generalized instruction as a skill. Clicking the mouse cancels teach mode. When OI invokes this skill in the future, it will just list those steps (it needs to figure out how to flexibly accomplish each step).
- [ ] Computer: "What do you want to name this skill?"
- [ ] User: Enters name in textbox
- [ ] Computer: "Whats the First Step"
- [ ] User: textbox appears types instructions
- [ ] Textbox disappears
- [ ] OI follows instruction
- [ ] "Did that work?" Yes/No?
- [ ] If No: repeat step training
- [ ] Computer: "Great! What's the next step?" ....
- [ ] Repeat until all steps of skill are completed
- [ ] Save skill as a function next() steps through user's steps
- [ ] Expose ^ via `01 --teach`.
- [ ] pip install 01
- [ ] Add `01 --server --expose`.
- [ ] Add --server --expose which will expose the server via something like Ngrok, display the public URL and a password, so the 01 Light can connect to it. This will let people use OI on their computer via their Light — i.e. "Check my emails" will run Applescript on their home computer.
- [ ] Sync Interpreter/Computer between code blocks
- [ ] New default dynamic system message with computer API + skills.
- [ ] Develop default system message for executive assistant.
- [ ] Better local system message
- [ ] write good docstrings for computer API
- [ ] Inject computer API into python routine
- [ ] determine streaming LMC protocol
- [ ] inlcude headers?
- [ ] Why is OI starting so slowly? We could use time.time() around things to track it down.
- [ ] Create moondream-powered computer.camera.
- [ ] Computer.camera.view(query) should take a picture and ask moondream the query. Defaults to "Describe this image in detail."
- [ ] Takes Picture
- [ ] Sends to describe API
- [ ] prints and returns description
- [ ] Llamafile for phi-2 + moondream
- [ ] test on rPi + Jetson (+android mini phone?)
**OS**
- [ ] Queue speech results
- [ ] TTS sentences should be queued + playback should stop once button is pressed
- [ ] expose server using Ngrok
- [ ] Swap out the current hosted functions for local ones.
- [ ] TTS — Piper? OpenVoice? Rasspy?
- [ ] STT — Whisper? Canary?
- [ ] LLM — Phi-2 Q4 Llamafile, just need to download it, OI knows how to use Llamafiles
- [ ] Functional Requirements
- [ ] for initial user setup and first experience
- [ ] If Light and no internet, open a captive wifi page with text boxes: Wifi Name, Wifi Pass, (optional) Server URL, (optional) Server Pass
- [ ] in device.py
- [ ] Camera input from user in device.py
- [ ] Can tapping the mic twice = trigger pressing the "button"? Simple sensing, just based on volume spikes?
- [ ] Update Architecture
- [ ] Base Devise Class
- [ ] Separate folders for Rasberry Pi, Desktop, Droid, App, Web
- [ ] device.py for each folder has input logic for that device
- [ ] Add basic TUI to device.py. Just renders messages and lets you add messages. Can easily copy OI's TUI.
- [ ] index.html for each folder has user interface for that device
- [ ] Web is just index.html
- [ ] Display.html? gui.html?
- [ ] Replace bootloader and boot script— should just run 01, full screen TUI.
- [ ] Package it as an ISO, or figure out some other simple install instructions. How to easily install on a Pi?
**Hardware**
- [ ] (Hardware and software) Get the 01OS working on the **Jetson** or Pi. Pick one to move forward with.
- [ ] Connect the Seeed Sense (ESP32 with Wifi, Bluetooth and a mic) to a small DAC + amplifier + speaker.
- [ ] Connect the Seeed Sense to a battery.
- [ ] Configure the ESP32 to be a wireless mic + speaker for the Jetson or Pi.
- [ ] Connect the Jetson or Pi to a battery.
- [ ] Make a rudimentary case for the Seeed Sense + speaker. Optional.
- [ ] Make a rudimentary case for the Jetson or Pi. Optional.
**Release Day**
- [ ] Launch video "cambriah explosion" 3d Sketch
- [ ] Create form to get pre-release feedback from 200 interested people (who responded to Killian's tweet)
**DONE**
- [ ] Get Local TTS working on Mac [Shiven]
- [ ] Get Local SST working on Mac [Zohaib + Shiven]
- [ ] Debug level logging/printing [Tom]
- [ ] Get hardware (mic, speaker, button) working on the rPi (running on battery) [Ty]
- [ ] device.py conditionals for platform [Ty]
- [ ] Kernal filtering issues [Tom]
- [ ] .env file [Tom]
- [ ] Save computer messages in User.json [Kristijan]
- [ ] Service Management [Zach]

@ -1,69 +0,0 @@
# Teams
## Hardware
- Ben @humanbee
- Ty @tyisfly
- Use Michael as a recruitor
- Shiven @shivenmian
- Jacob Weisel
- Aniket @atneik
- ..?
## Software
- Audio (TTS / SST)
- Tasks: Streaming audio both ways.
- Hardware limitations. What's the smallest hardware this can be on?
- Zach @zwf
- Zohaib @Zabirauf
- Atai @atai_copilotkit
- OI Core
- Tasks: Computer API (schedule thing), skill library
- Hristijan @thekeyq
- Aakash @ashgam.\_
- Aniket @atneik
- Shiven @shivenmian
- Ty @tyisfly
- Killian @killianlucas
- OS (formerly 'Linux / Firmware`)
- Tasks: Virtualization? ISO? Putting sensors around the OS to put files into the queue. Bootloader. Networked input into the queue
- Shiven @shivenmian
- Hristijan @thekeyq
- Michael @mjjt
- Zohaib @Zabirauf
## Experience
- Design
- Arturo @arturot
- Ronith @ronithk
- Danny @dannytayara
- Killian @killianlucas
- Aniket @atneik
- Alim
- Eschwa?
- Industrial
- Interface
- Web
- Brand / Video
- Arturo @arturot
- Killian @killianlucas
- Matt @matt_rygh
- Finn
- Research
- Ben @humanbee
- Use-cases
- Tasks: Send out typeform—what are motivating examples?
- Testing
- Uli @ulidabess
## Comms
- Uli @ulidabess
- Discord Community
- Twitter Presence
- Killian @killianlucas
- Press
- Michael @mjjt
- Zach (connection at NYT) @zwf

@ -1,6 +1,4 @@
# 01OS
## Contribute
## Contribute to these docs
- Clone this repo
- install mintlify CLI
- run mintlify in project directory to get preview of docs

@ -1,4 +0,0 @@
One of the core principles of software development is DRY (Don't Repeat
Yourself). This is a principle that apply to documentation as
well. If you find yourself repeating the same content in multiple places, you
should consider creating a custom snippet to keep your content in sync.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 278 KiB

Before

Width:  |  Height:  |  Size: 406 KiB

After

Width:  |  Height:  |  Size: 406 KiB

@ -1,3 +0,0 @@
- Uli @ulidabess
- Ben @humanbee
- Killian @killianlucas

@ -1,3 +0,0 @@
- Michael @mjjt
- Zach @zwf (connection at NYT)
- Killian @killianlucas

@ -1,7 +0,0 @@
- Arturo @arturot
- Ronith @ronithk
- Danny @dannytayara
- Killian @killianlucas
- Aniket @atneik
- [Alim?](https://twitter.com/almmaasoglu)
- [ESchwaa?](https://twitter.com/ESchwaa)

@ -1,5 +0,0 @@
- [ ] What does 01OS look like when you boot it up?
- [ ] What does 01OS look like when it's running?
- [ ] What does the 01 website look like?
Awaiting hardware design decisions until hardware team has decided if we're starting from scratch or repurposing.

@ -1 +0,0 @@
- [ ] Send out typeform to remote team — what are motivating use-cases?

@ -1,2 +0,0 @@
- Ben @humanbee
- Uli @ulidabess

@ -1,4 +0,0 @@
- Arturo @arturot
- Killian @killianlucas
- Matt @matt_rygh
- Finn

@ -1,33 +0,0 @@
### Non-pre-made hardware
1. Raspberry Pi
2. Raspberry Pi + Coral.ai Accelerator
3. Coral.ai Devboard
### Assembly-required OSS hardware
1. [The Raspberry Pi Recovery kit by Jay Doscher.](https://www.doscher.com/work-recovery-kit/) "A MOBILE TERMINAL FOR THE END OF THE WORLD
". I bet we could reach out to him and have him send some tbh.
![JAY02105](https://github.com/KillianLucas/01/assets/63927363/14b7438f-fe4c-45ed-86ab-17538c1fc600)
### Ready to buy, OSS hardware
1. [Clockwork's uConsole](https://www.clockworkpi.com/product-page/uconsole-kit-rpi-cm4-lite)
![3833f7_9e9fc3ed88534fb0b1eae043b3d5906e~mv2](https://github.com/KillianLucas/01/assets/63927363/ae2bd1f7-ffdf-42e6-87f8-2beb7e3145c6)
2. [Clockwork's Devterm](https://www.clockworkpi.com/product-page/devterm-kit-r01)
![3833f7_4f7e8e064a984027bddff865db0ca1b7~mv2](https://github.com/KillianLucas/01/assets/63927363/ee8cbfd4-bcb1-4eac-8c4d-d864fe3a0266)
### Ready to buy, non-OSS hardware
Can we modify the OS on these things? Some are OEM, which I think means we can contact the manufacturer and ask for changes.
1. [Conference speaker](https://www.amazon.com/dp/B0CCP1J8QW/ref=sspa_dk_detail_0?psc=1&pd_rd_i=B0CCP1J8QW&pd_rd_w=0wR2S&content-id=amzn1.sym.d81b167d-1f9e-48b6-87d8-8aa5e473ea8c&pf_rd_p=d81b167d-1f9e-48b6-87d8-8aa5e473ea8c&pf_rd_r=60DJHP5JV1DJ0BJ3V7N4&pd_rd_wg=OUF4S&pd_rd_r=c4d7e254-7b9e-4025-a252-7851ef880a18&s=musical-instruments&sp_csd=d2lkZ2V0TmFtZT1zcF9kZXRhaWxfdGhlbWF0aWM)
2. [Smartwatch](https://www.amazon.com/Parsonver-Smartwatch-Bluetooth-Activity-Pedometer/dp/B0BPM16KVM/ref=sr_1_22_sspa?keywords=voice%2Bassistant%2Bandroid&qid=1706051147&sr=8-22-spons&ufe=app_do%3Aamzn1.fos.006c50ae-5d4c-4777-9bc0-4513d670b6bc&sp_csd=d2lkZ2V0TmFtZT1zcF9tdGY&th=1)
3. [Smartwatch that looks like the 01 Light](https://www.alibaba.com/product-detail/MTL135-Reloj-Android-Smartwatch-2023-Montre_1600707760136.html?spm=a2700.galleryofferlist.normal_offer.d_image.24af7083iEzmhs)
4. [Smartwatch that looks like a square 01 Light](https://www.alibaba.com/product-detail/2023-Newest-4g-Sim-Call-S8_1600898456587.html?spm=a2700.galleryofferlist.normal_offer.d_image.2e9f70836cO7ae)
5. [Mic + speaker + button](https://www.alibaba.com/product-detail/Wholesale-CHATGPT4-0-ODM-OEM-Microphone_1601008248994.html?spm=a2700.galleryofferlist.p_offer.d_title.25ec7a08qFPP5l&s=p)
6. [shit, is the 01 Heavy just a weird laptop](https://www.alibaba.com/product-detail/8-Inch-Mini-Pocket-Laptop-Tablet_1600842995304.html)

@ -1,9 +0,0 @@
- [ ] **Should we just buy pre-made hardware?**
Some bodies— like the 01 Light without a screen and with a camera, and the 01 Heavy as a screenless tape-recorder with a camera and a button on the side— do not exist. So perhaps we should make CAD files and build them.
Other bodies we've floated already exist as Android phones (isn't the Rabbit R1 basically an Android phone?) smartwatches, laptops, and cyberdecks.
I think we should decide by 1) estimating how long custom hardware would take, and 2) weighing it against how _memorable of an impression_ the 01 would make if it did/did't have custom hardware. Holding something unique is a big part of this. But we might accomplish that by using some of the more bizarre looking hardware.
[Check out some of the options on the table here.](OPTIONS.md)

@ -1,7 +0,0 @@
- Ben @humanbee
- Ty @tyisfly
- Shiven @shivenmian
- Jacob Weisel
- Aniket @atneik
* for later, Michael offered to recruit more to this team

@ -1,22 +0,0 @@
# Development Setup for Jetson Nano
1. Go through the tutorial here: https://developer.nvidia.com/embedded/learn/get-started-jetson-nano-devkit#intro
2. At the end of that guide, you should have a Jetson running off a power supply or micro USB.
3. Get network connectivity. The Jetson does not have a WiFi module so you will need to plug in ethernet.
If you have a laptop, you can share internet access over Ethernet.
To do this with Mac, do the following:
a. Plug a cable from the Jetson Ethernet port to your Mac (you can use a Ethernet -> USB converter for your Mac).
b. Go to General->Sharing, then click the little `(i)` icon next to "Internet Sharing", and check all the options.
![](mac-share-internet.png)
c. Go back to General->Sharing, and turn on "Internet Sharing".
![](mac-share-internet-v2.png)
d. Now the Jetson should have connectivity!

Binary file not shown.

Before

Width:  |  Height:  |  Size: 470 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 702 KiB

@ -1,79 +0,0 @@
# How to set up 01 on a Raspberry Pi
## Supplies needed
- Raspberry Pi 5
- Micro SD Card
- USB-C cable
- Micro HDMI to HDMI cable
- Monitor
- Keyboard
- Mouse
- USB Microphone ([like this one](https://www.amazon.com/dp/B071WH7FC6?psc=1&ref=ppx_yo2ov_dt_b_product_details))
- USB or Bluetooth speaker
- Breadboard, jumper wires, 220R resistor and button (a kit like [this one](https://www.amazon.com/Smraza-Electronics-Potentiometer-tie-Points-Breadboard/dp/B0B62RL725/ref=sr_1_20?crid=MQDBAOQU7RYY&keywords=breadboard+kit&qid=1707665692&s=electronics&sprefix=breadboard%2Celectronics%2C346&sr=1-20) has everything you need)
## SD card setup
- Flash a new sd card using [Raspberry Pi Imager](https://www.raspberrypi.com/software/)
- Pick your device (only tested on Raspberry Pi 5)
- Select the OS: Scroll down to "Other General OS" Then select Ubuntu Desktop 64bit
- Select the storage: Select your sd card
- Proceed to flashing by selecting "Write"
## Hardware set up
- Connect Raspberry pi board to USB-C power
- Connect a keyboard, mouse, and mic to the USB ports
- Connect a monitor to the micro HDMI port
- Insert your newly flashed SD card into the slot under the device by the power button
- Power it on with the power button
- Hook up the Button to the breadboard,it should look like this:
![Button](button-diagram.png)
## Ubuntu set up
- Go through the system configuration on start up:
- Make sure to connect to wifi, we will need it to install 01 and it's packages
- Choose a password you will remember, you will need it later
- Open terminal
- `sudo apt update && sudo apt upgrade -y`
- Sometimes `dpkg` will complain, if it does, run `sudo dpkg --configure -a` and then run the update and upgrade commands again
Clone the repo:
- `sudo apt install git -y`
- `git clone https://github.com/KillianLucas/01`
- `cd 01/OS/01/`
Set up a virtual environment:
- `sudo apt install python3-venv -y`
- `python3 -m venv venv`
- `source venv/bin/activate`
Install packages:
- `sudo apt install ffmpeg portaudio19-dev` (ffmpeg and portaudio19-dev need to be installed with apt on linux)
- `sudo apt-get update`
- `sudo apt-get install gpiod`
- `pip install -r requirements.txt`
- pyaudio install might fail, these commands should fix it:
- `sudo apt-get install gcc make python3-dev portaudio19-dev`
- `pip install pyaudio`
Rename and edit the .env file:
- `mv .env.example .env` (rename the .env file)
- Add your OpenAI key to the .env file, or by running `export OPENAI_API_KEY="sk-..."`
- To add it to the .env in the terminal, run `nano .env`
- Add the key to the `OPENAI_API_KEY` line
- Save and exit by pressing `ctrl + x`, then `y`, then `enter`
Run the start script:
- `bash start.sh`
- There may be a few packages that didn't install, yielding a 'ModuleNotFoundError' error. If you see this, manually install each of them with pip and retry the `bash start.sh` command.
Done! You should now be able to use 01 on your Raspberry Pi 5, and use the button to invoke the assistant.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1022 KiB

@ -1,20 +0,0 @@
# January 20th, 2024
At our first meetup, we discussed the context and future of the six-week project and I laid out [four goals](https://github.com/KillianLucas/01/blob/main/GOALS.md).
### [Presentation Slides ↗](https://www.canva.com/design/DAF56kADkyc/2IgFkCuPoUg5lmv6-gGadg/view?utm_content=DAF56kADkyc&utm_campaign=designshare&utm_medium=link&utm_source=editor)
## Whiteboards
Regarding the minimal body:
![IMG_6280](https://github.com/KillianLucas/01/assets/63927363/6e0f833a-ffab-43ff-99b3-0914ff0a34db)
Regarding the heavy body:
![IMG_6282](https://github.com/KillianLucas/01/assets/63927363/c06bd0f5-eef8-4e26-83ec-0afeaa07eab6)
## Decisions
1. We'll try to build around the use-cases, some of which [I have compiled here.](https://github.com/KillianLucas/01/blob/main/USE_CASES.md) If you think of more please make a PR.
2. We want to design two bodies to house the 01, one will be very minimal and require an internet connection (possible names: The 01 **Light**, The 01 **Click**, or The 01 **Feather**) and another will run fully locally (The 01 **Heavy**).

@ -1,3 +0,0 @@
- [ ] STT implementation — Can we get a bash script that we can run on startup that starts a whisper.cpp tiny binary with an endpoint to connect to it (or something) so script.js can stream audio to it?
- [ ] TSS implementation — Same as above ^ bash script that starts Rhasspy then some way to connect script.js to it?
- [ ] Hardware limitations / find minimum requirements for this to be performant. What's the shittiest hardware this can be run on?

@ -1,5 +0,0 @@
- Zach @zwf
- Zohaib @Zabirauf
- Atai @atai_copilotkit
Team lead: Zach

@ -1,3 +0,0 @@
- [ ] Release Open Interpreter `0.2.1`
- [ ] Meet to determine Computer API additions for the 01
- [ ] Meet to decide how to build the skill library + skill recording

@ -1,8 +0,0 @@
- Hristijan @thekeyq
- Aakash @ashgam.\_
- Aniket @atneik
- Shiven @shivenmian
- Ty @tyisfly
- Killian @killianlucas
Team lead: Killian

@ -1,13 +0,0 @@
- [ ] Modify bootloader.
- [ ] Decide: better queue?
<br>
So, Michael suggested we simply watch and filter the `dmesg` stream (I think that's what it's called?), so I suppose we could have a script like `/01/core/kernel_watch.py` that puts things into the queue? Honestly knowing we could get it all from one place like that— maybe this should be simpler. **Is the queue folder necessary?** How about we just expect the computer to send {"role": "computer"} messages to a POST endpoint at "/queue" or maybe "/inturrupt" or maybe "/" but with POST? When it gets those it puts them in the redis queue, which is checked frequently, so it's handled immediatly. So then yeah, maybe we do have redis there, then instead of looking at that folder, we check the redis queue. Is this better for any reason? Making the way computer messages are sent = an HTTP request, not putting a file in a folder?
- [ ] Virtualization?
- [ ] Best workflow for pressing to an ISO? Cubic?
- [ ] Putting sensors around the OS to put things into the queue / `dmesg` implementation.
- [ ] Networked input into the queue? (Exploring this makes me thing the "/queue" or something endpoint is smarter to do than the "queue" folder)
# For later
- [ ] We could have `/i` which other interpreter's hit. That behaves more like the OpenAI POST endpoint with stream=True by default (i think this is important for users to see the exchange happening in real time, streaming `event/stream` or whatever). You could imagine some kind of handshake — another interpreter → my interpreter's /i → the sender is unrecognized → computer message is sent to /, prompting AI to ask the user to have the sending interpreter send a specific code → the user tells the sending interpreter to use that specific code → the sender is recognized and added to friends-list (`computer.inetwork.friends()`) → now they can hit eachother's i endpoints freely with `computer.inetwork.friend(id).message("hey")`.
- [ ] (OS team: this will require coordination with the OI core team, so let's talk about it / I'll explain at the next meetup.) When transfering skills that require OS control, the sender can replace those skills with that command, with one input "natural language query" (?) proceeded by the skill function name or something like that. Basically so if you ask it to do something you set up as a skill, it actually asks your computer to do it. If you ask your computer to do it directly, it's more direct.

@ -1,5 +0,0 @@
- Shiven @shivenmian
- Hristijan @thekeyq
- Killian @killianlucas
- Michael @mjjt
- Zohaib @Zabirauf

@ -1,36 +0,0 @@
import subprocess
import sys
import ctypes
import os
def main():
"""Run pytest in the software directory.
This script is intended to be used as a pre-commit hook to run the tests from the root of the repository.
"""
# Additional setup for Windows (10 at least) to prevent issues with Unicode characters in the console.
# see https://www.reddit.com/r/learnpython/comments/350c8c/unicode_python_3_and_the_windows_console/
if sys.platform.startswith("win"):
# Force UTF-8 encoding in Python
os.environ["PYTHONUTF8"] = "1"
# Change Windows console code page to UTF-8
ctypes.windll.kernel32.SetConsoleCP(65001)
ctypes.windll.kernel32.SetConsoleOutputCP(65001)
# Define the target directory relative to this script location.
target_directory = os.path.join(os.path.dirname(__file__), "software")
os.chdir(target_directory)
# Run pytest with any additional arguments passed to this script.
result = subprocess.run(["pytest"] + sys.argv[1:])
# Exit with pytest's exit code to reflect the test outcome in the pre-commit hook.
sys.exit(result.returncode)
if __name__ == "__main__":
main()

@ -1,3 +0,0 @@
_archive
__pycache__
.idea

@ -11,14 +11,17 @@
... --qr # Displays a qr code
"""
from yaspin import yaspin
spinner = yaspin()
spinner.start()
import typer
import ngrok
import platform
import threading
import os
import importlib
from source.server.tunnel import create_tunnel
from source.server.async_server import start_server
from source.server.server import start_server
import subprocess
import socket
import json
@ -124,11 +127,14 @@ def run(
if server == "light":
light_server_port = server_port
voice = True # The light server will support voice
elif server == "livekit":
# The light server should run at a different port if we want to run a livekit server
spinner.stop()
print(f"Starting light server (required for livekit server) on the port before `--server-port` (port {server_port-1}), unless the `AN_OPEN_PORT` env var is set.")
print(f"The livekit server will be started on port {server_port}.")
light_server_port = os.getenv('AN_OPEN_PORT', server_port-1)
voice = False # The light server will NOT support voice. It will just run Open Interpreter. The Livekit server will handle voice
server_thread = threading.Thread(
target=start_server,
@ -136,9 +142,12 @@ def run(
server_host,
light_server_port,
profile,
voice,
debug
),
)
spinner.stop()
print("Starting server...")
server_thread.start()
threads.append(server_thread)
@ -164,7 +173,7 @@ def run(
# Start the livekit worker
worker_thread = threading.Thread(
target=run_command, args=("python worker.py dev",) # TODO: This should not be a CLI, it should just run the python file
target=run_command, args=("python source/server/livekit/worker.py dev",) # TODO: This should not be a CLI, it should just run the python file
)
time.sleep(7)
worker_thread.start()
@ -208,6 +217,8 @@ def run(
)
client_thread = threading.Thread(target=module.run, args=[server_url, debug])
spinner.stop()
print("Starting client...")
client_thread.start()
threads.append(client_thread)

1201
software/poetry.lock generated

File diff suppressed because it is too large Load Diff

@ -3,51 +3,15 @@ name = "01OS"
packages = [
{include = "source"},
]
include = ["start.py"]
include = ["main.py"]
version = "0.0.14"
description = "The open-source language model computer"
description = "The #1 open-source voice interface for desktop, mobile, and ESP32 chips."
authors = ["Killian <killian@openinterpreter.com>"]
license = "AGPL"
readme = "../README.md"
[tool.poetry.dependencies]
python = ">=3.9,<3.12"
pyaudio = "^0.2.14"
pynput = "^1.7.6"
websockets = "^12.0"
python-dotenv = "^1.0.1"
ffmpeg-python = "^0.2.0"
textual = "^0.50.1"
pydub = "^0.25.1"
ngrok = "^1.0.0"
simpleaudio = "^1.0.4"
opencv-python = "^4.9.0.80"
psutil = "^5.9.8"
platformdirs = "^4.2.0"
rich = "^13.7.1"
pytimeparse = "^1.1.8"
python-crontab = "^3.0.0"
inquirer = "^3.2.4"
pyqrcode = "^1.2.1"
realtimestt = "^0.1.16"
realtimetts = { version = "^0.4.2", extras = ["all"] }
keyboard = "^0.13.5"
pyautogui = "^0.9.54"
ctranslate2 = "4.1.0"
#py3-tts = "^3.5"
#elevenlabs = "1.2.2"
groq = "^0.5.0"
open-interpreter = {git = "https://github.com/OpenInterpreter/open-interpreter.git", branch = "development", extras = ["os", "server"]}
litellm = "*"
openai = "*"
pywebview = "*"
pyobjc = "*"
sentry-sdk = "^2.4.0"
plyer = "^2.1.0"
pywinctl = "^0.3"
certifi = "^2024.7.4"
pygame = "^2.6.0"
mpv = "^1.0.7"
python = ">=3.10,<3.12"
livekit = "^0.12.1"
livekit-agents = "^0.8.6"
livekit-plugins-deepgram = "^0.6.5"
@ -55,13 +19,19 @@ livekit-plugins-openai = "^0.8.1"
livekit-plugins-silero = "^0.6.4"
livekit-plugins-elevenlabs = "^0.7.3"
segno = "^1.6.1"
open-interpreter = {extras = ["os", "server"], version = "^0.3.8"}
ngrok = "^1.4.0"
realtimetts = {extras = ["all"], version = "^0.4.5"}
realtimestt = "^0.2.41"
pynput = "^1.7.7"
yaspin = "^3.0.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
01 = "start:app"
01 = "main:app"
[tool.poetry.group.dev.dependencies]
black = "^24.3.0"

@ -1,10 +0,0 @@
; Config for Pytest Runner.
; suppress Deprecation Warning and User Warning to not spam the interface, but check periodically
[pytest]
python_files = tests.py test_*.py
filterwarnings =
ignore::UserWarning
ignore::DeprecationWarning
log_cli = true
log_cli_level = INFO

@ -1,482 +0,0 @@
from dotenv import load_dotenv
load_dotenv() # take environment variables from .env.
import subprocess
import os
import sys
import asyncio
import threading
import pyaudio
from pynput import keyboard
import json
import traceback
import websockets
import queue
from pydub import AudioSegment
from pydub.playback import play
import time
import wave
import tempfile
from datetime import datetime
import cv2
import base64
import platform
from interpreter import (
interpreter,
) # Just for code execution. Maybe we should let people do from interpreter.computer import run?
# In the future, I guess kernel watching code should be elsewhere? Somewhere server / client agnostic?
from ..server.utils.kernel import put_kernel_messages_into_queue
from ..server.utils.get_system_info import get_system_info
from ..server.utils.process_utils import kill_process_tree
from ..server.utils.logs import setup_logging
from ..server.utils.logs import logger
setup_logging()
os.environ["STT_RUNNER"] = "server"
os.environ["TTS_RUNNER"] = "server"
from ..utils.accumulator import Accumulator
accumulator = Accumulator()
# Configuration for Audio Recording
CHUNK = 1024 # Record in chunks of 1024 samples
FORMAT = pyaudio.paInt16 # 16 bits per sample
CHANNELS = 1 # Mono
RATE = 16000 # Sample rate
RECORDING = False # Flag to control recording state
SPACEBAR_PRESSED = False # Flag to track spacebar press state
# Camera configuration
CAMERA_ENABLED = os.getenv("CAMERA_ENABLED", False)
if type(CAMERA_ENABLED) == str:
CAMERA_ENABLED = CAMERA_ENABLED.lower() == "true"
CAMERA_DEVICE_INDEX = int(os.getenv("CAMERA_DEVICE_INDEX", 0))
CAMERA_WARMUP_SECONDS = float(os.getenv("CAMERA_WARMUP_SECONDS", 0))
# Specify OS
current_platform = get_system_info()
def is_win11():
return sys.getwindowsversion().build >= 22000
def is_win10():
try:
return (
platform.system() == "Windows"
and "10" in platform.version()
and not is_win11()
)
except:
return False
# Initialize PyAudio
p = pyaudio.PyAudio()
send_queue = queue.Queue()
class Device:
def __init__(self):
self.pressed_keys = set()
self.captured_images = []
self.audiosegments = asyncio.Queue()
self.server_url = ""
self.ctrl_pressed = False
self.tts_service = ""
self.debug = False
self.playback_latency = None
def fetch_image_from_camera(self, camera_index=CAMERA_DEVICE_INDEX):
"""Captures an image from the specified camera device and saves it to a temporary file. Adds the image to the captured_images list."""
image_path = None
cap = cv2.VideoCapture(camera_index)
ret, frame = cap.read() # Capture a single frame to initialize the camera
if CAMERA_WARMUP_SECONDS > 0:
# Allow camera to warm up, then snap a picture again
# This is a workaround for some cameras that don't return a properly exposed
# picture immediately when they are first turned on
time.sleep(CAMERA_WARMUP_SECONDS)
ret, frame = cap.read()
if ret:
temp_dir = tempfile.gettempdir()
image_path = os.path.join(
temp_dir, f"01_photo_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.png"
)
self.captured_images.append(image_path)
cv2.imwrite(image_path, frame)
logger.info(f"Camera image captured to {image_path}")
logger.info(
f"You now have {len(self.captured_images)} images which will be sent along with your next audio message."
)
else:
logger.error(
f"Error: Couldn't capture an image from camera ({camera_index})"
)
cap.release()
return image_path
def encode_image_to_base64(self, image_path):
"""Encodes an image file to a base64 string."""
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
def add_image_to_send_queue(self, image_path):
"""Encodes an image and adds an LMC message to the send queue with the image data."""
base64_image = self.encode_image_to_base64(image_path)
image_message = {
"role": "user",
"type": "image",
"format": "base64.png",
"content": base64_image,
}
send_queue.put(image_message)
# Delete the image file from the file system after sending it
os.remove(image_path)
def queue_all_captured_images(self):
"""Queues all captured images to be sent."""
for image_path in self.captured_images:
self.add_image_to_send_queue(image_path)
self.captured_images.clear() # Clear the list after sending
async def play_audiosegments(self):
"""Plays them sequentially."""
if self.tts_service == "elevenlabs":
print("Ensure `mpv` in installed to use `elevenlabs`.\n\n(On macOSX, you can run `brew install mpv`.)")
mpv_command = ["mpv", "--no-cache", "--no-terminal", "--", "fd://0"]
mpv_process = subprocess.Popen(
mpv_command,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
while True:
try:
audio = await self.audiosegments.get()
if self.debug and self.playback_latency and isinstance(audio, bytes):
elapsed_time = time.time() - self.playback_latency
print(f"Time from request to playback: {elapsed_time} seconds")
self.playback_latency = None
if self.tts_service == "elevenlabs":
mpv_process.stdin.write(audio) # type: ignore
mpv_process.stdin.flush() # type: ignore
else:
play(audio)
await asyncio.sleep(0.1)
except asyncio.exceptions.CancelledError:
# This happens once at the start?
pass
except:
logger.info(traceback.format_exc())
def record_audio(self):
if os.getenv("STT_RUNNER") == "server":
# STT will happen on the server. we're sending audio.
send_queue.put(
{"role": "user", "type": "audio", "format": "bytes.wav", "start": True}
)
elif os.getenv("STT_RUNNER") == "client":
# STT will happen here, on the client. we're sending text.
send_queue.put({"role": "user", "type": "message", "start": True})
else:
raise Exception("STT_RUNNER must be set to either 'client' or 'server'.")
"""Record audio from the microphone and add it to the queue."""
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
)
print("Recording started...")
global RECORDING
# Create a temporary WAV file to store the audio data
temp_dir = tempfile.gettempdir()
wav_path = os.path.join(
temp_dir, f"audio_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav"
)
wav_file = wave.open(wav_path, "wb")
wav_file.setnchannels(CHANNELS)
wav_file.setsampwidth(p.get_sample_size(FORMAT))
wav_file.setframerate(RATE)
while RECORDING:
data = stream.read(CHUNK, exception_on_overflow=False)
wav_file.writeframes(data)
wav_file.close()
stream.stop_stream()
stream.close()
print("Recording stopped.")
if self.debug:
self.playback_latency = time.time()
duration = wav_file.getnframes() / RATE
if duration < 0.3:
# Just pressed it. Send stop message
if os.getenv("STT_RUNNER") == "client":
send_queue.put({"role": "user", "type": "message", "content": "stop"})
send_queue.put({"role": "user", "type": "message", "end": True})
else:
send_queue.put(
{
"role": "user",
"type": "audio",
"format": "bytes.wav",
"content": "",
}
)
send_queue.put(
{
"role": "user",
"type": "audio",
"format": "bytes.wav",
"end": True,
}
)
else:
self.queue_all_captured_images()
if os.getenv("STT_RUNNER") == "client":
# THIS DOES NOT WORK. We moved to this very cool stt_service, llm_service
# way of doing things. stt_wav is not a thing anymore. Needs work to work
# Run stt then send text
text = stt_wav(wav_path)
logger.debug(f"STT result: {text}")
send_queue.put({"role": "user", "type": "message", "content": text})
send_queue.put({"role": "user", "type": "message", "end": True})
else:
# Stream audio
with open(wav_path, "rb") as audio_file:
byte_data = audio_file.read(CHUNK)
while byte_data:
send_queue.put(byte_data)
byte_data = audio_file.read(CHUNK)
send_queue.put(
{
"role": "user",
"type": "audio",
"format": "bytes.wav",
"end": True,
}
)
if os.path.exists(wav_path):
os.remove(wav_path)
def toggle_recording(self, state):
"""Toggle the recording state."""
global RECORDING, SPACEBAR_PRESSED
if state and not SPACEBAR_PRESSED:
SPACEBAR_PRESSED = True
if not RECORDING:
RECORDING = True
threading.Thread(target=self.record_audio).start()
elif not state and SPACEBAR_PRESSED:
SPACEBAR_PRESSED = False
RECORDING = False
def on_press(self, key):
"""Detect spacebar press and Ctrl+C combination."""
self.pressed_keys.add(key) # Add the pressed key to the set
if keyboard.Key.space in self.pressed_keys:
self.toggle_recording(True)
elif {keyboard.Key.ctrl, keyboard.KeyCode.from_char("c")} <= self.pressed_keys:
logger.info("Ctrl+C pressed. Exiting...")
kill_process_tree()
os._exit(0)
# Windows alternative to the above
if key == keyboard.Key.ctrl_l:
self.ctrl_pressed = True
try:
if key.vk == 67 and self.ctrl_pressed:
logger.info("Ctrl+C pressed. Exiting...")
kill_process_tree()
os._exit(0)
# For non-character keys
except:
pass
def on_release(self, key):
"""Detect spacebar release and 'c' key press for camera, and handle key release."""
self.pressed_keys.discard(
key
) # Remove the released key from the key press tracking set
if key == keyboard.Key.ctrl_l:
self.ctrl_pressed = False
if key == keyboard.Key.space:
self.toggle_recording(False)
elif CAMERA_ENABLED and key == keyboard.KeyCode.from_char("c"):
self.fetch_image_from_camera()
async def message_sender(self, websocket):
while True:
message = await asyncio.get_event_loop().run_in_executor(
None, send_queue.get
)
if isinstance(message, bytes):
await websocket.send(message)
else:
await websocket.send(json.dumps(message))
send_queue.task_done()
await asyncio.sleep(0.01)
async def websocket_communication(self, WS_URL):
show_connection_log = True
async def exec_ws_communication(websocket):
if CAMERA_ENABLED:
print(
"\nHold the spacebar to start recording. Press 'c' to capture an image from the camera. Press CTRL-C to exit."
)
else:
print("\nHold the spacebar to start recording. Press CTRL-C to exit.")
asyncio.create_task(self.message_sender(websocket))
while True:
await asyncio.sleep(0.01)
chunk = await websocket.recv()
logger.debug(f"Got this message from the server: {type(chunk)} {chunk}")
# print("received chunk from server")
if type(chunk) == str:
chunk = json.loads(chunk)
if chunk.get("type") == "config":
self.tts_service = chunk.get("tts_service")
continue
if self.tts_service == "elevenlabs":
message = chunk
else:
message = accumulator.accumulate(chunk)
if message == None:
# Will be None until we have a full message ready
continue
# At this point, we have our message
if isinstance(message, bytes) or (
message["type"] == "audio" and message["format"].startswith("bytes")
):
# Convert bytes to audio file
if self.tts_service == "elevenlabs":
audio_bytes = message
audio = audio_bytes
else:
audio_bytes = message["content"]
# Create an AudioSegment instance with the raw data
audio = AudioSegment(
# raw audio data (bytes)
data=audio_bytes,
# signed 16-bit little-endian format
sample_width=2,
# 16,000 Hz frame rate
frame_rate=22050,
# mono sound
channels=1,
)
await self.audiosegments.put(audio)
# Run the code if that's the client's job
if os.getenv("CODE_RUNNER") == "client":
if message["type"] == "code" and "end" in message:
language = message["format"]
code = message["content"]
result = interpreter.computer.run(language, code)
send_queue.put(result)
if is_win10():
logger.info("Windows 10 detected")
# Workaround for Windows 10 not latching to the websocket server.
# See https://github.com/OpenInterpreter/01/issues/197
try:
ws = websockets.connect(WS_URL)
await exec_ws_communication(ws)
except Exception as e:
logger.error(f"Error while attempting to connect: {e}")
else:
while True:
try:
async with websockets.connect(WS_URL) as websocket:
await exec_ws_communication(websocket)
except:
logger.debug(traceback.format_exc())
if show_connection_log:
logger.info(f"Connecting to `{WS_URL}`...")
show_connection_log = False
await asyncio.sleep(2)
async def start_async(self):
# Configuration for WebSocket
WS_URL = f"ws://{self.server_url}"
# Start the WebSocket communication
asyncio.create_task(self.websocket_communication(WS_URL))
# Start watching the kernel if it's your job to do that
if os.getenv("CODE_RUNNER") == "client":
# client is not running code!
asyncio.create_task(put_kernel_messages_into_queue(send_queue))
asyncio.create_task(self.play_audiosegments())
# If Raspberry Pi, add the button listener, otherwise use the spacebar
if current_platform.startswith("raspberry-pi"):
logger.info("Raspberry Pi detected, using button on GPIO pin 15")
# Use GPIO pin 15
pindef = ["gpiochip4", "15"] # gpiofind PIN15
print("PINDEF", pindef)
# HACK: needs passwordless sudo
process = await asyncio.create_subprocess_exec(
"sudo", "gpiomon", "-brf", *pindef, stdout=asyncio.subprocess.PIPE
)
while True:
line = await process.stdout.readline()
if line:
line = line.decode().strip()
if "FALLING" in line:
self.toggle_recording(False)
elif "RISING" in line:
self.toggle_recording(True)
else:
break
else:
# Keyboard listener for spacebar press/release
listener = keyboard.Listener(
on_press=self.on_press, on_release=self.on_release
)
listener.start()
def start(self):
if os.getenv("TEACH_MODE") != "True":
asyncio.run(self.start_async())
p.terminate()

@ -1,99 +0,0 @@
import asyncio
import websockets
import pyaudio
from pynput import keyboard
import json
from yaspin import yaspin
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RECORDING_RATE = 16000
PLAYBACK_RATE = 24000
class Device:
def __init__(self):
self.server_url = "0.0.0.0:10001"
self.p = pyaudio.PyAudio()
self.websocket = None
self.recording = False
self.input_stream = None
self.output_stream = None
self.spinner = yaspin()
self.play_audio = True
async def connect_with_retry(self, max_retries=50, retry_delay=2):
for attempt in range(max_retries):
try:
self.websocket = await websockets.connect(f"ws://{self.server_url}")
print("Connected to server.")
# Send auth, which the server requires (docs.openinterpreter.com/server/usage)
await self.websocket.send(json.dumps({"auth": True}))
return
except ConnectionRefusedError:
if attempt % 4 == 0:
print(f"Waiting for the server to be ready...")
await asyncio.sleep(retry_delay)
raise Exception("Failed to connect to the server after multiple attempts")
async def send_audio(self):
self.input_stream = self.p.open(format=FORMAT, channels=CHANNELS, rate=RECORDING_RATE, input=True, frames_per_buffer=CHUNK)
while True:
if self.recording:
try:
# Send start flag
await self.websocket.send(json.dumps({"role": "user", "type": "audio", "format": "bytes.wav", "start": True}))
# print("Sending audio start message")
while self.recording:
data = self.input_stream.read(CHUNK, exception_on_overflow=False)
await self.websocket.send(data)
# Send stop flag
await self.websocket.send(json.dumps({"role": "user", "type": "audio", "format": "bytes.wav", "end": True}))
# print("Sending audio end message")
except Exception as e:
print(f"Error in send_audio: {e}")
await asyncio.sleep(0.01)
async def receive_audio(self):
self.output_stream = self.p.open(format=FORMAT, channels=CHANNELS, rate=PLAYBACK_RATE, output=True, frames_per_buffer=CHUNK)
while True:
try:
data = await self.websocket.recv()
if self.play_audio and isinstance(data, bytes) and not self.recording:
self.output_stream.write(data)
except Exception as e:
await self.connect_with_retry()
def on_press(self, key):
if key == keyboard.Key.ctrl and not self.recording:
#print("Space pressed, starting recording")
print("\n")
self.spinner.start()
self.recording = True
def on_release(self, key):
if key == keyboard.Key.ctrl:
self.spinner.stop()
#print("Space released, stopping recording")
self.recording = False
# elif key == keyboard.Key.esc:
# print("Esc pressed, stopping the program")
# return False
async def main(self):
await self.connect_with_retry()
print("Hold CTRL to record. Press 'CTRL-C' to quit.")
listener = keyboard.Listener(on_press=self.on_press, on_release=self.on_release)
listener.start()
await asyncio.gather(self.send_audio(), self.receive_audio())
def start(self):
asyncio.run(self.main())
if __name__ == "__main__":
device = Device()
device.start()

@ -1,28 +0,0 @@
# ESP32 Playback
To set up audio recording + playback on the ESP32 (M5 Atom), do the following:
1. Open Arduino IDE, and open the `client/client.ino` file
2. Go to Tools -> Board -> Boards Manager, search "esp32", then install the boards by Arduino and Espressif
3. Go to Tools -> Manage Libraries, then install the following (_with_ dependencies, if it prompts you to install with/without dependencies):
- M5Atom by M5Stack
- WebSockets by Markus Sattler
- ESPAsyncWebServer by lacamera
4. The board needs to connect to WiFi. Once you flash, connect to ESP32 wifi "captive" which will get wifi details. Once it connects, it will ask you to enter 01OS server address in the format "domain.com:port" or "ip:port". Once its able to connect you can use the device.
5. To flash the .ino to the board, connect the board to the USB port, select the port from the dropdown on the IDE, then select the M5Atom board (or M5Stack-ATOM if you have that). Click on upload to flash the board.
### Alternative - PlatformIO
You don't need anything, PlatformIO will install everything for you, dependencies, tool chains, etc.
Please install first [PlatformIO](http://platformio.org/) open source ecosystem for IoT development compatible with **Arduino** IDE and its command line tools (Windows, MacOs and Linux), and then enter to the firmware directory:
```bash
cd client/
```
And build and upload the firmware with a simple command:
```bash
pio run --target upload
```

@ -26,15 +26,14 @@ class Device:
for attempt in range(max_retries):
try:
self.websocket = await websockets.connect(f"ws://{self.server_url}")
print("Connected to server.")
# Send auth, which the server requires (docs.openinterpreter.com/server/usage)
await self.websocket.send(json.dumps({"auth": True}))
return
except ConnectionRefusedError:
if attempt % 4 == 0:
print(f"Waiting for the server to be ready...")
if attempt % 8 == 0 and attempt != 0:
print(f"Loading...")
await asyncio.sleep(retry_delay)
raise Exception("Failed to connect to the server after multiple attempts")
@ -71,7 +70,7 @@ class Device:
def on_press(self, key):
if key == keyboard.Key.ctrl and not self.recording:
#print("Space pressed, starting recording")
print("\n")
print("")
self.spinner.start()
self.recording = True
@ -86,7 +85,7 @@ class Device:
async def main(self):
await self.connect_with_retry()
print("Hold CTRL to speak to the assistant. Press 'CTRL-C' to quit.")
print("\nHold CTRL to speak to your assistant. Press 'CTRL-C' to quit.")
listener = keyboard.Listener(on_press=self.on_press, on_release=self.on_release)
listener.start()
await asyncio.gather(self.send_audio(), self.receive_audio())

@ -1,13 +0,0 @@
from ..base_device import Device
device = Device()
def main(server_url, debug):
device.server_url = server_url
device.debug = debug
device.start()
if __name__ == "__main__":
main()

@ -1,14 +0,0 @@
from ..base_device import Device
device = Device()
def main(server_url, debug, play_audio):
device.server_url = server_url
device.debug = debug
device.play_audio = play_audio
device.start()
if __name__ == "__main__":
main()

@ -1,11 +0,0 @@
from ..base_device import Device
device = Device()
def main():
device.start()
if __name__ == "__main__":
main()

@ -1,13 +0,0 @@
from ..base_device import Device
device = Device()
def main(server_url, debug):
device.server_url = server_url
device.debug = debug
device.start()
if __name__ == "__main__":
main()

@ -1,252 +0,0 @@
# This is a websocket interpreter, TTS and STT disabled.
# It makes a websocket on port 8000 that sends/recieves LMC messages in *streaming* format.
### You MUST send a start and end flag with each message! For example: ###
"""
{"role": "user", "type": "message", "start": True})
{"role": "user", "type": "message", "content": "hi"})
{"role": "user", "type": "message", "end": True})
"""
###
from pynput import keyboard
from .utils.bytes_to_wav import bytes_to_wav
from RealtimeTTS import TextToAudioStream, CoquiEngine, OpenAIEngine, ElevenlabsEngine
from RealtimeSTT import AudioToTextRecorder
import time
import asyncio
import json
import os
class AsyncInterpreter:
def __init__(self, interpreter, debug):
self.stt_latency = None
self.tts_latency = None
self.interpreter_latency = None
# time from first put to first yield
self.tffytfp = None
self.debug = debug
self.interpreter = interpreter
self.audio_chunks = []
# STT
self.stt = AudioToTextRecorder(
model="tiny.en", spinner=False, use_microphone=False
)
self.stt.stop() # It needs this for some reason
# TTS
if self.interpreter.tts == "coqui":
engine = CoquiEngine()
elif self.interpreter.tts == "openai":
engine = OpenAIEngine()
elif self.interpreter.tts == "elevenlabs":
engine = ElevenlabsEngine(api_key=os.environ["ELEVEN_LABS_API_KEY"])
engine.set_voice("Michael")
else:
raise ValueError(f"Unsupported TTS engine: {self.interpreter.tts}")
self.tts = TextToAudioStream(engine)
self.active_chat_messages = []
self._input_queue = asyncio.Queue() # Queue that .input will shove things into
self._output_queue = asyncio.Queue() # Queue to put output chunks into
self._last_lmc_start_flag = None # Unix time of last LMC start flag recieved
self._in_keyboard_write_block = (
False # Tracks whether interpreter is trying to use the keyboard
)
self.loop = asyncio.get_event_loop()
async def _add_to_queue(self, queue, item):
await queue.put(item)
async def clear_queue(self, queue):
while not queue.empty():
await queue.get()
async def clear_input_queue(self):
await self.clear_queue(self._input_queue)
async def clear_output_queue(self):
await self.clear_queue(self._output_queue)
async def input(self, chunk):
"""
Expects a chunk in streaming LMC format.
"""
if isinstance(chunk, bytes):
# It's probably a chunk of audio
self.stt.feed_audio(chunk)
self.audio_chunks.append(chunk)
# print("INTERPRETER FEEDING AUDIO")
else:
try:
chunk = json.loads(chunk)
except:
pass
if "start" in chunk:
# print("Starting STT")
self.stt.start()
self._last_lmc_start_flag = time.time()
# self.interpreter.computer.terminal.stop() # Stop any code execution... maybe we should make interpreter.stop()?
elif "end" in chunk:
# print("Running OI on input")
asyncio.create_task(self.run())
else:
await self._add_to_queue(self._input_queue, chunk)
def add_to_output_queue_sync(self, chunk):
"""
Synchronous function to add a chunk to the output queue.
"""
# print("ADDING TO QUEUE:", chunk)
asyncio.create_task(self._add_to_queue(self._output_queue, chunk))
def generate(self, message, start_interpreter):
last_lmc_start_flag = self._last_lmc_start_flag
self.interpreter.messages = self.active_chat_messages
# print("message is", message)
for chunk in self.interpreter.chat(message, display=True, stream=True):
if self._last_lmc_start_flag != last_lmc_start_flag:
# self.beeper.stop()
break
# self.add_to_output_queue_sync(chunk) # To send text, not just audio
content = chunk.get("content")
# Handle message blocks
if chunk.get("type") == "message":
if content:
# self.beeper.stop()
# Experimental: The AI voice sounds better with replacements like these, but it should happen at the TTS layer
# content = content.replace(". ", ". ... ").replace(", ", ", ... ").replace("!", "! ... ").replace("?", "? ... ")
# print("yielding ", content)
if self.tffytfp is None:
self.tffytfp = time.time()
yield content
# Handle code blocks
elif chunk.get("type") == "code":
if "start" in chunk:
# self.beeper.start()
pass
# Experimental: If the AI wants to type, we should type immediatly
if (
self.interpreter.messages[-1]
.get("content", "")
.startswith("computer.keyboard.write(")
):
keyboard.controller.type(content)
self._in_keyboard_write_block = True
if "end" in chunk and self._in_keyboard_write_block:
self._in_keyboard_write_block = False
# (This will make it so it doesn't type twice when the block executes)
if self.interpreter.messages[-1]["content"].startswith(
"computer.keyboard.write("
):
self.interpreter.messages[-1]["content"] = (
"dummy_variable = ("
+ self.interpreter.messages[-1]["content"][
len("computer.keyboard.write(") :
]
)
# Send a completion signal
if self.debug:
end_interpreter = time.time()
self.interpreter_latency = end_interpreter - start_interpreter
print("INTERPRETER LATENCY", self.interpreter_latency)
# self.add_to_output_queue_sync({"role": "server","type": "completion", "content": "DONE"})
async def run(self):
"""
Runs OI on the audio bytes submitted to the input. Will add streaming LMC chunks to the _output_queue.
"""
self.interpreter.messages = self.active_chat_messages
self.stt.stop()
input_queue = []
while not self._input_queue.empty():
input_queue.append(self._input_queue.get())
if self.debug:
start_stt = time.time()
message = self.stt.text()
end_stt = time.time()
self.stt_latency = end_stt - start_stt
print("STT LATENCY", self.stt_latency)
if self.audio_chunks:
audio_bytes = bytearray(b"".join(self.audio_chunks))
wav_file_path = bytes_to_wav(audio_bytes, "audio/raw")
print("wav_file_path ", wav_file_path)
self.audio_chunks = []
else:
message = self.stt.text()
print(message)
# Feed generate to RealtimeTTS
self.add_to_output_queue_sync(
{"role": "assistant", "type": "audio", "format": "bytes.wav", "start": True}
)
start_interpreter = time.time()
text_iterator = self.generate(message, start_interpreter)
self.tts.feed(text_iterator)
if not self.tts.is_playing():
self.tts.play_async(on_audio_chunk=self.on_tts_chunk, muted=True)
while True:
await asyncio.sleep(0.1)
# print("is_playing", self.tts.is_playing())
if not self.tts.is_playing():
self.add_to_output_queue_sync(
{
"role": "assistant",
"type": "audio",
"format": "bytes.wav",
"end": True,
}
)
if self.debug:
end_tts = time.time()
self.tts_latency = end_tts - self.tts.stream_start_time
print("TTS LATENCY", self.tts_latency)
self.tts.stop()
break
async def _on_tts_chunk_async(self, chunk):
# print("adding chunk to queue")
if self.debug and self.tffytfp is not None and self.tffytfp != 0:
print(
"time from first yield to first put is ",
time.time() - self.tffytfp,
)
self.tffytfp = 0
await self._add_to_queue(self._output_queue, chunk)
def on_tts_chunk(self, chunk):
# print("ye")
asyncio.run(self._on_tts_chunk_async(chunk))
async def output(self):
# print("outputting chunks")
return await self._output_queue.get()

@ -1,124 +0,0 @@
import asyncio
import traceback
import json
from fastapi import FastAPI, WebSocket, Depends
from fastapi.responses import PlainTextResponse
from uvicorn import Config, Server
from .async_interpreter import AsyncInterpreter
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Dict, Any
import os
import importlib.util
os.environ["STT_RUNNER"] = "server"
os.environ["TTS_RUNNER"] = "server"
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"], # Allow all methods (GET, POST, etc.)
allow_headers=["*"], # Allow all headers
)
async def get_debug_flag():
return app.state.debug
@app.get("/ping")
async def ping():
return PlainTextResponse("pong")
@app.websocket("/")
async def websocket_endpoint(
websocket: WebSocket, debug: bool = Depends(get_debug_flag)
):
await websocket.accept()
global global_interpreter
interpreter = global_interpreter
# Send the tts_service value to the client
await websocket.send_text(
json.dumps({"type": "config", "tts_service": interpreter.interpreter.tts})
)
try:
async def receive_input():
while True:
if websocket.client_state == "DISCONNECTED":
break
data = await websocket.receive()
await asyncio.sleep(0)
if isinstance(data, bytes):
await interpreter.input(data)
elif "bytes" in data:
await interpreter.input(data["bytes"])
# print("RECEIVED INPUT", data)
elif "text" in data:
# print("RECEIVED INPUT", data)
await interpreter.input(data["text"])
async def send_output():
while True:
output = await interpreter.output()
await asyncio.sleep(0)
if isinstance(output, bytes):
# print(f"Sending {len(output)} bytes of audio data.")
await websocket.send_bytes(output)
elif isinstance(output, dict):
# print("sending text")
await websocket.send_text(json.dumps(output))
await asyncio.gather(send_output(), receive_input())
except Exception as e:
print(f"WebSocket connection closed with exception: {e}")
traceback.print_exc()
finally:
if not websocket.client_state == "DISCONNECTED":
await websocket.close()
async def main(server_host, server_port, profile, debug):
app.state.debug = debug
# Load the profile module from the provided path
spec = importlib.util.spec_from_file_location("profile", profile)
profile_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(profile_module)
# Get the interpreter from the profile
interpreter = profile_module.interpreter
if not hasattr(interpreter, 'tts'):
print("Setting TTS provider to default: openai")
interpreter.tts = "openai"
# Make it async
interpreter = AsyncInterpreter(interpreter, debug)
global global_interpreter
global_interpreter = interpreter
print(f"Starting server on {server_host}:{server_port}")
config = Config(app, host=server_host, port=server_port, lifespan="on")
server = Server(config)
await server.serve()
if __name__ == "__main__":
asyncio.run(main())

@ -1,36 +0,0 @@
# tests currently hang after completion
"""
import pytest
import signal
import os
from .profiles.default import interpreter
from async_interpreter import AsyncInterpreter
from fastapi.testclient import TestClient
from .async_server import app
@pytest.fixture
def client():
return TestClient(app)
@pytest.fixture
def mock_interpreter():
async_interpreter = AsyncInterpreter(interpreter)
yield async_interpreter
async_interpreter.shutdown()
@pytest.fixture(scope="function", autouse=True)
def term_handler():
orig = signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT))
yield
signal.signal(signal.SIGTERM, orig)
yield
# Send SIGTERM signal to the current process and its children
os.kill(os.getpid(), signal.SIGTERM)
"""

@ -1,15 +1,17 @@
from RealtimeTTS import TextToAudioStream, CoquiEngine, OpenAIEngine, ElevenlabsEngine
from fastapi.responses import PlainTextResponse
from RealtimeSTT import AudioToTextRecorder
from RealtimeTTS import TextToAudioStream
import importlib
import warnings
import asyncio
import types
import wave
import os
import sys
os.environ["INTERPRETER_REQUIRE_ACKNOWLEDGE"] = "False"
def start_server(server_host, server_port, profile, debug):
def start_server(server_host, server_port, profile, voice, debug):
# Load the profile module from the provided path
spec = importlib.util.spec_from_file_location("profile", profile)
@ -19,6 +21,18 @@ def start_server(server_host, server_port, profile, debug):
# Get the interpreter from the profile
interpreter = profile_module.interpreter
# Apply our settings to it
interpreter.verbose = debug
interpreter.server.host = server_host
interpreter.server.port = server_port
if voice == False:
# If voice is False, just start the standard OI server
interpreter.server.run()
exit()
# ONLY if voice is True, will we run the rest of this file.
# STT
interpreter.stt = AudioToTextRecorder(
model="tiny.en", spinner=False, use_microphone=False
@ -29,21 +43,30 @@ def start_server(server_host, server_port, profile, debug):
if not hasattr(interpreter, 'tts'):
print("Setting TTS provider to default: openai")
interpreter.tts = "openai"
if interpreter.tts == "coqui":
from RealtimeTTS import CoquiEngine
engine = CoquiEngine()
elif interpreter.tts == "openai":
engine = OpenAIEngine(voice="onyx")
from RealtimeTTS import OpenAIEngine
if hasattr(interpreter, 'voice'):
voice = interpreter.voice
else:
voice = "onyx"
engine = OpenAIEngine(voice=voice)
elif interpreter.tts == "elevenlabs":
engine = ElevenlabsEngine(api_key=os.environ["ELEVEN_LABS_API_KEY"])
engine.set_voice("Will")
from RealtimeTTS import ElevenlabsEngine
engine = ElevenlabsEngine()
if hasattr(interpreter, 'voice'):
voice = interpreter.voice
else:
voice = "Will"
engine.set_voice(voice)
else:
raise ValueError(f"Unsupported TTS engine: {interpreter.tts}")
interpreter.tts = TextToAudioStream(engine)
# Misc Settings
interpreter.verbose = debug
interpreter.server.host = server_host
interpreter.server.port = server_port
interpreter.play_audio = False
interpreter.audio_chunks = []
@ -66,7 +89,10 @@ def start_server(server_host, server_port, profile, debug):
self.stt.stop()
content = self.stt.text()
print("\n\nUser: ", content)
if content.strip() == "":
return
print(">", content.strip())
if False:
audio_bytes = bytearray(b"".join(self.audio_chunks))
@ -127,6 +153,6 @@ def start_server(server_host, server_port, profile, debug):
return PlainTextResponse("pong")
# Start server
interpreter.server.display = True
interpreter.print = True
interpreter.debug = False
interpreter.server.run()

@ -1,242 +0,0 @@
# The dynamic system message is where most of the 01's behavior is configured.
# You can put code into the system message {{ in brackets like this }}
# which will be rendered just before the interpreter starts writing a message.
import os
system_message = r"""
You are the 01, a SCREENLESS executive assistant that can complete any task.
When you execute code, it will be executed on the user's machine. The user has given you full and complete permission to execute any code necessary to complete the task. Execute the code.
You can access the internet. Run any code to achieve the goal, and if at first you don't succeed, try again and again.
You can install new packages.
Be concise. Your messages are being read aloud to the user. DO NOT MAKE PLANS. RUN CODE QUICKLY.
Try to spread complex tasks over multiple code blocks. Don't try to complex tasks in one go.
Manually summarize text.
Use computer.browser.search for almost everything. Use Applescript frequently.
The user is in Seattle, Washington.
To send email, use Applescript. To check calendar events, use iCal buddy (e.g. `/opt/homebrew/bin/icalBuddy eventsFrom:today to:+7`)
DONT TELL THE USER THE METHOD YOU'LL USE. Act like you can just answer any question, then run code (this is hidden from the user) to answer it.
Your responses should be very short, no more than 1-2 sentences long.
DO NOT USE MARKDOWN. ONLY WRITE PLAIN TEXT. DO NOT USE MARKDOWN.
# TASKS
You should help the user manage their tasks.
Store the user's tasks in a Python list called `tasks`.
---
The user's current task is: {{ tasks[0] if tasks else "No current tasks." }}
{{
if len(tasks) > 1:
print("The next task is: ", tasks[1])
}}
---
When the user completes the current task, you should remove it from the list and read the next item by running `tasks = tasks[1:]\ntasks[0]`. Then, tell the user what the next task is.
When the user tells you about a set of tasks, you should intelligently order tasks, batch similar tasks, and break down large tasks into smaller tasks (for this, you should consult the user and get their permission to break it down). Your goal is to manage the task list as intelligently as possible, to make the user as efficient and non-overwhelmed as possible. They will require a lot of encouragement, support, and kindness. Don't say too much about what's ahead of them just try to focus them on each step at a time.
After starting a task, you should check in with the user around the estimated completion time to see if the task is completed.
To do this, schedule a reminder based on estimated completion time using the function `schedule(days=0, hours=0, mins=0, secs=0, datetime="valid date time", message="Your message here.")`, WHICH HAS ALREADY BEEN IMPORTED. YOU DON'T NEED TO IMPORT THE `schedule` FUNCTION. IT IS AVAILABLE. You'll receive the message at the time you scheduled it.
You guide the user through the list one task at a time, convincing them to move forward, giving a pep talk if need be. Your job is essentially to answer "what should I (the user) be doing right now?" for every moment of the day.
# BROWSER
The Google search result will be returned from this function as a string: `computer.browser.search("query")`
# CRITICAL NOTES
Code output, despite being sent to you by the user, cannot be seen by the user. You NEED to tell the user about the output of some code, even if it's exact. >>The user does not have a screen.<<
ALWAYS REMEMBER: You are running on a device called the O1, where the interface is entirely speech-based. Make your responses to the user VERY short. DO NOT PLAN. BE CONCISE. WRITE CODE TO RUN IT.
Translate things to other languages INSTANTLY and MANUALLY. Don't try to use a translation tool. Summarize things manually. Don't use a summarizer tool.
"""
# OLD SYSTEM MESSAGE
old_system_message = r"""
You are the 01, an executive assistant that can complete **any** task.
When you execute code, it will be executed **on the user's machine**. The user has given you **full and complete permission** to execute any code necessary to complete the task. Execute the code.
You can access the internet. Run **any code** to achieve the goal, and if at first you don't succeed, try again and again.
You can install new packages.
Be concise. Your messages are being read aloud to the user. DO NOT MAKE PLANS. Immediately run code.
Try to spread complex tasks over multiple code blocks.
Manually summarize text. You cannot use other libraries to do this. You MUST MANUALLY SUMMARIZE, WITHOUT CODING.
For the users request, first, choose if you want to use Python, Applescript, Shell, or computer control (below) via Python.
# USER'S TASKS
You should help the user manage their tasks.
Store the user's tasks in a Python list called `tasks`.
---
The user's current task is: {{ tasks[0] if tasks else "No current tasks." }}
{{
if len(tasks) > 1:
print("The next task is: ", tasks[1])
}}
---
When the user completes the current task, you should remove it from the list and read the next item by running `tasks = tasks[1:]\ntasks[0]`. Then, tell the user what the next task is.
When the user tells you about a set of tasks, you should intelligently order tasks, batch similar tasks, and break down large tasks into smaller tasks (for this, you should consult the user and get their permission to break it down). Your goal is to manage the task list as intelligently as possible, to make the user as efficient and non-overwhelmed as possible. They will require a lot of encouragement, support, and kindness. Don't say too much about what's ahead of them just try to focus them on each step at a time.
After starting a task, you should check in with the user around the estimated completion time to see if the task is completed. Use the `schedule(datetime, message)` function, which has already been imported.
To do this, schedule a reminder based on estimated completion time using the function `schedule(datetime_object, "Your message here.")`, WHICH HAS ALREADY BEEN IMPORTED. YOU DON'T NEED TO IMPORT THE `schedule` FUNCTION. IT IS AVALIABLE. You'll receive the message at `datetime_object`.
You guide the user through the list one task at a time, convincing them to move forward, giving a pep talk if need be. Your job is essentially to answer "what should I (the user) be doing right now?" for every moment of the day.
# COMPUTER CONTROL (RARE)
You are a computer controlling language model. You can 100% control the user's GUI.
You may use the `computer` Python module (already imported) to control the user's keyboard and mouse, if the task **requires** it:
```python
computer.browser.search(query)
computer.display.view() # Shows you what's on the screen, returns a `pil_image` `in case you need it (rarely). **You almost always want to do this first!**
computer.keyboard.hotkey(" ", "command") # Opens spotlight
computer.keyboard.write("hello")
computer.mouse.click("text onscreen") # This clicks on the UI element with that text. Use this **frequently** and get creative! To click a video, you could pass the *timestamp* (which is usually written on the thumbnail) into this.
computer.mouse.move("open recent >") # This moves the mouse over the UI element with that text. Many dropdowns will disappear if you click them. You have to hover over items to reveal more.
computer.mouse.click(x=500, y=500) # Use this very, very rarely. It's highly inaccurate
computer.mouse.click(icon="gear icon") # Moves mouse to the icon with that description. Use this very often
computer.mouse.scroll(-10) # Scrolls down. If you don't find some text on screen that you expected to be there, you probably want to do this
x, y = computer.display.center() # Get your bearings
computer.clipboard.view() # Returns contents of clipboard
computer.os.get_selected_text() # Use frequently. If editing text, the user often wants this
```
You are an image-based AI, you can see images.
Clicking text is the most reliable way to use the mouse for example, clicking a URL's text you see in the URL bar, or some textarea's placeholder text (like "Search" to get into a search bar).
If you use `plt.show()`, the resulting image will be sent to you. However, if you use `PIL.Image.show()`, the resulting image will NOT be sent to you.
It is very important to make sure you are focused on the right application and window. Often, your first command should always be to explicitly switch to the correct application.
When searching the web, use query parameters. For example, https://www.amazon.com/s?k=monitor
Try multiple methods before saying the task is impossible. **You can do it!**
{{
# Add window information
import sys
import os
import json
original_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
original_stderr = sys.stderr
sys.stderr = open(os.devnull, 'w')
try:
import pywinctl
active_window = pywinctl.getActiveWindow()
if active_window:
app_info = ""
if "_appName" in active_window.__dict__:
app_info += (
"Active Application: " + active_window.__dict__["_appName"]
)
if hasattr(active_window, "title"):
app_info += "\n" + "Active Window Title: " + active_window.title
elif "_winTitle" in active_window.__dict__:
app_info += (
"\n"
+ "Active Window Title:"
+ active_window.__dict__["_winTitle"]
)
if app_info != "":
print(app_info)
except:
# Non blocking
pass
finally:
sys.stdout = original_stdout
sys.stderr = original_stderr
}}
# SKILLS
Try to use the following functions (assume they're imported) to complete your goals whenever possible:
{{
import sys
import os
import json
from interpreter import interpreter
from pathlib import Path
interpreter.model = "gpt-3.5"
combined_messages = "\\n".join(json.dumps(x) for x in messages[-3:])
#query_msg = interpreter.chat(f"This is the conversation so far: {combined_messages}. What is a <10 words query that could be used to find functions that would help answer the user's question?")
#query = query_msg[0]['content']
query = combined_messages
interpreter.computer.skills.path = '''OI_SKILLS_DIR'''
skills = interpreter.computer.skills.search(query)
lowercase_skills = [skill[0].lower() + skill[1:] for skill in skills]
output = "\\n".join(lowercase_skills)
# VERY HACKY! We should fix this, we hard code it for noisy code^:
print("IGNORE_ALL_ABOVE_THIS_LINE")
print(output)
}}
Remember: You can run Python code outside a function only to run a Python function; all other code must go in a in Python function if you first write a Python function. ALL imports must go inside the function.
# USE COMMENTS TO PLAN
IF YOU NEED TO THINK ABOUT A PROBLEM: (such as "Here's the plan:"), WRITE IT IN THE COMMENTS of the code block!
For example:
> User: What is 432/7?
> Assistant: Let me use Python to calculate that.
> Assistant Python function call:
> # Here's the plan:
> # 1. Divide the numbers
> # 2. Round it to 3 digits.
> print(round(432/7, 3))
> Assistant: 432 / 7 is 61.714.
# FINAL MESSAGES
ALWAYS REMEMBER: You are running on a device called the O1, where the interface is entirely speech-based. Make your responses to the user **VERY short.**
""".strip().replace(
"OI_SKILLS_DIR", os.path.join(os.path.dirname(__file__), "skills")
)

@ -1,136 +0,0 @@
# The dynamic system message is where most of the 01's behavior is configured.
# You can put code into the system message {{ in brackets like this }}
# which will be rendered just before the interpreter starts writing a message.
import os
system_message = r"""
You are the 01, an executive assistant that can complete **any** task.
When you execute code, it will be executed **on the user's machine**. The user has given you **full and complete permission** to execute any code necessary to complete the task. Execute the code.
For the users request, ALWAYS CHOOSE PYTHON. If the task requires computer control, USE THE computer control (mentioned below) or the Skills library (also mentioned below) via Python.
Try to execute the user's request with the computer control or the Skills library first. ONLY IF the task cannot be completed using the computer control or the skills library, write your own code.
If you're writing your own code, YOU CAN ACCESS THE INTERNET. Run **any code** to achieve the goal, and if at first you don't succeed, try again and again.
You can install new packages.
Be concise. DO NOT MAKE PLANS. Immediately run code.
Try to spread complex tasks over multiple code blocks.
Manually summarize text. You cannot use other libraries to do this. You MUST MANUALLY SUMMARIZE, WITHOUT CODING.
When a user refers to a filename, they're likely referring to an existing file in the directory you're currently executing code in.
# COMPUTER CONTROL
You are a computer controlling language model. You can 100% control the user's GUI.
You may use the `computer` Python module to control the user's keyboard and mouse, if the task **requires** it:
```python
from interpreter import interpreter
import os
import time
interpreter.computer.browser.search(query)
interpreter.computer.display.view() # Shows you what's on the screen, returns a `pil_image` `in case you need it (rarely). **You almost always want to do this first!**
interpreter.computer.keyboard.hotkey(" ", "command") # Opens spotlight
interpreter.computer.keyboard.write("hello")
interpreter.computer.mouse.click("text onscreen") # This clicks on the UI element with that text. Use this **frequently** and get creative! To click a video, you could pass the *timestamp* (which is usually written on the thumbnail) into this.
interpreter.computer.mouse.move("open recent >") # This moves the mouse over the UI element with that text. Many dropdowns will disappear if you click them. You have to hover over items to reveal more.
interpreter.computer.mouse.click(x=500, y=500) # Use this very, very rarely. It's highly inaccurate
interpreter.computer.mouse.click(icon="gear icon") # Moves mouse to the icon with that description. Use this very often
interpreter.computer.mouse.scroll(-10) # Scrolls down. If you don't find some text on screen that you expected to be there, you probably want to do this
x, y = interpreter.computer.display.center() # Get your bearings
interpreter.computer.clipboard.view() # Returns contents of clipboard
interpreter.computer.os.get_selected_text() # Use frequently. If editing text, the user often wants this
```
You are an image-based AI, you can see images.
Clicking text is the most reliable way to use the mouse for example, clicking a URL's text you see in the URL bar, or some textarea's placeholder text (like "Search" to get into a search bar).
If you use `plt.show()`, the resulting image will be sent to you. However, if you use `PIL.Image.show()`, the resulting image will NOT be sent to you.
It is very important to make sure you are focused on the right application and window. Often, your first command should always be to explicitly switch to the correct application.
When searching the web, use query parameters. For example, https://www.amazon.com/s?k=monitor
Try multiple methods before saying the task is impossible. **You can do it!**
{{
import sys
import os
import json
original_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
original_stderr = sys.stderr
sys.stderr = open(os.devnull, 'w')
try:
import pywinctl
active_window = pywinctl.getActiveWindow()
if active_window:
app_info = ""
if "_appName" in active_window.__dict__:
app_info += (
"Active Application: " + active_window.__dict__["_appName"]
)
if hasattr(active_window, "title"):
app_info += "\n" + "Active Window Title: " + active_window.title
elif "_winTitle" in active_window.__dict__:
app_info += (
"\n"
+ "Active Window Title:"
+ active_window.__dict__["_winTitle"]
)
if app_info != "":
print(app_info)
except:
pass
finally:
sys.stdout = original_stdout
sys.stderr = original_stderr
}}
# SKILLS LIBRARY
This is the skills library. Try to use the following functions to complete your goals WHENEVER POSSIBLE:
{{
import sys
import os
import json
from interpreter import interpreter
from pathlib import Path
interpreter.model = "gpt-3.5"
combined_messages = "\\n".join(json.dumps(x) for x in messages[-3:])
#query_msg = interpreter.chat(f"This is the conversation so far: {combined_messages}. What is a <10 words query that could be used to find functions that would help answer the user's question?")
#query = query_msg[0]['content']
query = combined_messages
interpreter.computer.skills.path = '''OI_SKILLS_DIR'''
skills = interpreter.computer.skills.search(query)
lowercase_skills = [skill[0].lower() + skill[1:] for skill in skills]
output = "\\n".join(lowercase_skills)
# VERY HACKY! We should fix this, we hard code it for noisy code^:
#print("IGNORE_ALL_ABOVE_THIS_LINE")
print(output)
}}
Remember: You can run Python code outside a function only to run a Python function; all other code must go in a in Python function if you first write a Python function. ALL imports must go inside the function.
""".strip().replace(
"OI_SKILLS_DIR", os.path.abspath(os.path.join(os.path.dirname(__file__), "skills"))
)

@ -11,15 +11,13 @@ def test_poetry_run_01():
while True:
output = process.stdout.readline().decode('utf-8')
if "Hold spacebar to record." in output:
if "Hold" in output:
assert True
return
if time.time() > timeout:
assert False, "Timeout reached without finding expected output."
return
# @pytest.mark.skip(reason="pytest hanging")
# def test_ping(client):
# response = client.get("/ping")

@ -1,30 +0,0 @@
import ngrok
import pyqrcode
from ..utils.print_markdown import print_markdown
def create_tunnel(
server_host="localhost", server_port=10101, qr=False, domain=None
):
"""
To use most of ngroks features, youll need an authtoken. To obtain one, sign up for free at ngrok.com and
retrieve it from the authtoken page in your ngrok dashboard.
https://dashboard.ngrok.com/get-started/your-authtoken
You can set it as `NGROK_AUTHTOKEN` in your environment variables
"""
print_markdown("Exposing server to the internet...")
if domain:
listener = ngrok.forward(f"{server_host}:{server_port}", authtoken_from_env=True, domain=domain)
else:
listener = ngrok.forward(f"{server_host}:{server_port}", authtoken_from_env=True)
listener_url = listener.url()
print(f"Ingress established at: {listener_url}");
if listener_url and qr:
text = pyqrcode.create(listener_url)
print(text.terminal(quiet_zone=1))
return listener_url

@ -1,67 +0,0 @@
from datetime import datetime
import os
import contextlib
import tempfile
import ffmpeg
import subprocess
def convert_mime_type_to_format(mime_type: str) -> str:
if mime_type == "audio/x-wav" or mime_type == "audio/wav":
return "wav"
if mime_type == "audio/webm":
return "webm"
if mime_type == "audio/raw":
return "dat"
return mime_type
@contextlib.contextmanager
def export_audio_to_wav_ffmpeg(audio: bytearray, mime_type: str) -> str:
temp_dir = tempfile.gettempdir()
# Create a temporary file with the appropriate extension
input_ext = convert_mime_type_to_format(mime_type)
input_path = os.path.join(
temp_dir, f"input_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.{input_ext}"
)
with open(input_path, "wb") as f:
f.write(audio)
# Check if the input file exists
assert os.path.exists(input_path), f"Input file does not exist: {input_path}"
# Export to wav
output_path = os.path.join(
temp_dir, f"output_{datetime.now().strftime('%Y%m%d%H%M%S%f')}.wav"
)
# print(mime_type, input_path, output_path)
if mime_type == "audio/raw":
ffmpeg.input(
input_path,
f="s16le",
ar="16000",
ac=1,
).output(output_path, loglevel="panic").run()
else:
ffmpeg.input(input_path).output(
output_path, acodec="pcm_s16le", ac=1, ar="16k", loglevel="panic"
).run()
try:
yield output_path
finally:
os.remove(input_path)
def run_command(command):
result = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True
)
return result.stdout, result.stderr
def bytes_to_wav(audio_bytes: bytearray, mime_type):
with export_audio_to_wav_ffmpeg(audio_bytes, mime_type) as wav_file_path:
return wav_file_path

@ -1,24 +0,0 @@
from dotenv import load_dotenv
load_dotenv() # take environment variables from .env.
import os
import logging
logger: logging.Logger = logging.getLogger("01")
root_logger: logging.Logger = logging.getLogger()
def _basic_config() -> None:
logging.basicConfig(format="%(message)s")
def setup_logging() -> None:
env = os.environ.get("LOG_LEVEL", "").upper()
if env == "DEBUG":
_basic_config()
logger.setLevel(logging.DEBUG)
root_logger.setLevel(logging.DEBUG)
elif env == "INFO":
_basic_config()
logger.setLevel(logging.INFO)

@ -1,33 +0,0 @@
import os
import psutil
import signal
def kill_process_tree():
pid = os.getpid() # Get the current process ID
try:
# Send SIGTERM to the entire process group to ensure all processes are targeted
try:
os.killpg(os.getpgid(pid), signal.SIGKILL)
# Windows implementation
except AttributeError:
os.kill(pid, signal.SIGTERM)
parent = psutil.Process(pid)
children = parent.children(recursive=True)
for child in children:
print(f"Forcefully terminating child PID {child.pid}")
child.kill() # Forcefully kill the child process immediately
gone, still_alive = psutil.wait_procs(children, timeout=3)
if still_alive:
for child in still_alive:
print(f"Child PID {child.pid} still alive, attempting another kill")
child.kill()
print(f"Forcefully terminating parent PID {pid}")
parent.kill() # Forcefully kill the parent process immediately
parent.wait(3) # Wait for the parent process to terminate
except psutil.NoSuchProcess:
print(f"Process {pid} does not exist or is already terminated")
except psutil.AccessDenied:
print("Permission denied to terminate some processes")

@ -1,93 +0,0 @@
class Accumulator:
def __init__(self):
self.template = {"role": None, "type": None, "format": None, "content": None}
self.message = self.template
def accumulate(self, chunk):
# print(str(chunk)[:100])
if type(chunk) == dict:
if "format" in chunk and chunk["format"] == "active_line":
# We don't do anything with these
return None
if "start" in chunk:
self.message = chunk
self.message.pop("start")
return None
if "content" in chunk:
if any(
self.message[key] != chunk[key]
for key in self.message
if key != "content"
):
self.message = chunk
if "content" not in self.message:
self.message["content"] = chunk["content"]
else:
if type(chunk["content"]) == dict:
# dict concatenation cannot happen, so we see if chunk is a dict
self.message["content"]["content"] += chunk["content"][
"content"
]
else:
self.message["content"] += chunk["content"]
return None
if "end" in chunk:
# We will proceed
message = self.message
self.message = self.template
return message
if type(chunk) == bytes:
if "content" not in self.message or type(self.message["content"]) != bytes:
self.message["content"] = b""
self.message["content"] += chunk
return None
def accumulate_mobile(self, chunk):
# print(str(chunk)[:100])
if type(chunk) == dict:
if "format" in chunk and chunk["format"] == "active_line":
# We don't do anything with these
return None
if "start" in chunk:
self.message = chunk
self.message.pop("start")
return None
if "content" in chunk:
if any(
self.message[key] != chunk[key]
for key in self.message
if key != "content"
):
self.message = chunk
if "content" not in self.message:
self.message["content"] = chunk["content"]
else:
if type(chunk["content"]) == dict:
# dict concatenation cannot happen, so we see if chunk is a dict
self.message["content"]["content"] += chunk["content"][
"content"
]
else:
self.message["content"] += chunk["content"]
return None
if "end" in chunk:
# We will proceed
message = self.message
self.message = self.template
return message
if type(chunk) == bytes:
if "content" not in self.message or type(self.message["content"]) != bytes:
self.message["content"] = b""
self.message["content"] += chunk
self.message["type"] = "audio"
self.message["format"] = "bytes.wav"
return self.message

@ -1,10 +0,0 @@
from rich.console import Console
from rich.markdown import Markdown
def print_markdown(markdown_text):
console = Console()
md = Markdown(markdown_text)
print("")
console.print(md)
print("")
Loading…
Cancel
Save