From 3319a5d492755f8f509233f0fd6638474008ef6b Mon Sep 17 00:00:00 2001 From: killian <63927363+KillianLucas@users.noreply.github.com> Date: Sat, 27 Jan 2024 20:08:27 -0800 Subject: [PATCH] Websocket --- OS/01/app/index.html | 45 +++++++++++-- .../computer_api_extensions/clock.py | 33 ++++++++++ OS/01/core/interpreter/main.py | 65 ++++++++++++++----- OS/01/core/interpreter/start.py | 15 +++++ 4 files changed, 136 insertions(+), 22 deletions(-) create mode 100644 OS/01/core/interpreter/computer_api_extensions/clock.py diff --git a/OS/01/app/index.html b/OS/01/app/index.html index d9a066c..108c012 100644 --- a/OS/01/app/index.html +++ b/OS/01/app/index.html @@ -1,5 +1,42 @@ - \ No newline at end of file + + Chat + + + +
+ + +
+
+ + + + \ No newline at end of file diff --git a/OS/01/core/interpreter/computer_api_extensions/clock.py b/OS/01/core/interpreter/computer_api_extensions/clock.py new file mode 100644 index 0000000..69c0544 --- /dev/null +++ b/OS/01/core/interpreter/computer_api_extensions/clock.py @@ -0,0 +1,33 @@ +import threading +from datetime import datetime +import json +import time + +class Clock: + def __init__(self, computer): + self.computer = computer + + def schedule(self, dt, message): + # Calculate the delay in seconds + delay = (dt - datetime.now()).total_seconds() + + # Create a timer + timer = threading.Timer(delay, self.add_message_to_queue, args=[message]) + + # Start the timer + timer.start() + + def add_message_to_queue(self, message): + + # Define the message data and convert it to JSON + message_json = json.dumps({ + "role": "computer", + "type": "message", + "content": message + }) + + # Write the JSON data to the file + timestamp = str(int(time.time())) + with open(f"/01/core/queue/{timestamp}.json", "w") as file: + file.write(message_json) + diff --git a/OS/01/core/interpreter/main.py b/OS/01/core/interpreter/main.py index 447a5c9..1d38cc1 100644 --- a/OS/01/core/interpreter/main.py +++ b/OS/01/core/interpreter/main.py @@ -1,33 +1,62 @@ """ -Responsible for taking an interpreter, then serving it at "/" as a POST SSE endpoint, accepting and streaming LMC Messages. +Responsible for taking an interpreter, then serving it at "/" as a websocket, accepting and streaming LMC Messages. https://docs.openinterpreter.com/protocols/lmc-messages Also needs to be saving conversations, and checking the queue. """ -from typing import Generator import uvicorn -from fastapi import FastAPI, Request, Response -from starlette.exceptions import DisconnectedClientError +from fastapi import FastAPI, WebSocket +import asyncio +import json def main(interpreter): app = FastAPI() - @app.post("/") - async def i_endpoint(request: Request) -> Response: - async def event_stream() -> Generator[str, None, None]: - data = await request.json() - # TODO: Save conversation to /conversations - try: - for response in interpreter.chat(message=data["message"], stream=True): - yield response - # TODO: Check queue. Do we need to break (I guess we need a while loop around this..?) - # and handle the new message from the queue? Then delete the message from the queue. - except DisconnectedClientError: - print("Client disconnected") - # TODO: Save conversation to /conversations - return Response(event_stream(), media_type="text/event-stream") + @app.websocket("/") + async def i_test(websocket: WebSocket): + await websocket.accept() + while True: + data = await websocket.receive_text() + while data.strip().lower() != "stop": # Stop command + task = asyncio.create_task(websocket.receive_text()) + + # This would be terrible for production. Just for testing. + try: + data_dict = json.loads(data) + if set(data_dict.keys()) == {"role", "content", "type"} or set( + data_dict.keys() + ) == {"role", "content", "type", "format"}: + data = data_dict + except json.JSONDecodeError: + pass + + for response in interpreter.chat( + message=data, stream=True, display=False + ): + if task.done(): + data = task.result() # Get the new message + break # Break the loop and start processing the new message + # Send out assistant message chunks + if ( + response.get("type") == "message" + and response["role"] == "assistant" + and "content" in response + ): + await websocket.send_text(response["content"]) + await asyncio.sleep(0.01) # Add a small delay + if ( + response.get("type") == "message" + and response["role"] == "assistant" + and response.get("end") == True + ): + await websocket.send_text("\n") + await asyncio.sleep(0.01) # Add a small delay + if not task.done(): + data = ( + await task + ) # Wait for the next message if it hasn't arrived yet uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/OS/01/core/interpreter/start.py b/OS/01/core/interpreter/start.py index 75b6b9b..d19a6ed 100644 --- a/OS/01/core/interpreter/start.py +++ b/OS/01/core/interpreter/start.py @@ -5,6 +5,7 @@ Responsible for configuring an interpreter, then using main.py to serve it at "/ from .main import main from interpreter import interpreter import os +import glob ### SYSTEM MESSAGE @@ -42,6 +43,20 @@ You guide the user through the list one task at a time, convincing them to move interpreter.system_message = system_message +# Give it access to the computer API + +# Get a list of all .py files in the /computer_api_extensions directory +computer_api_extensions = glob.glob('/computer_api_extensions/*.py') + +# Read the content of each file and store it in a list +computer_api_extensions_content = [] +for file in computer_api_extensions: + with open(file, 'r') as f: + computer_api_extensions_content.append(f.read()) + +for content in computer_api_extensions_content: + interpreter.computer.run("python", content) + ### LLM SETTINGS