clean up + error handling

Former-commit-id: de03769f8b
workerULTRANODE
Kye 2 years ago
parent 1f8079671b
commit 1e22d9b20a

@ -5,7 +5,7 @@ from typing import Dict, List
from fastapi.templating import Jinja2Templates
from swarms.agents.utils.manager import AgentManager
from swarms.agents.utils.AgentManager import AgentManager
from swarms.utils.main import BaseHandler, FileHandler, FileType
from swarms.tools.main import CsvToDataframe, ExitConversation, RequestsGet, CodeEditor, Terminal

@ -1,3 +1,3 @@
# from swarms import Swarms, swarm
from swarms.swarms import Swarms, swarm
from swarms.agents import worker_node, UltraNode
from swarms.agents import worker_node, WorkerUltraNode

@ -11,7 +11,7 @@ from langchain.callbacks.base import BaseCallbackManager
from .chat_agent import ConversationalChatAgent
from .llm import ChatOpenAI
from .parser import EvalOutputParser
from .EvalOutputParser import EvalOutputParser
class AgentBuilder:
@ -21,9 +21,9 @@ class AgentBuilder:
self.global_tools: list = None
self.toolsets = toolsets
def build_llm(self, callback_manager: BaseCallbackManager = None):
def build_llm(self, callback_manager: BaseCallbackManager = None, openai_api_key: str = None):
self.llm = ChatOpenAI(
temperature=0, callback_manager=callback_manager, verbose=True
temperature=0, callback_manager=callback_manager, verbose=True, openai_api_key=openai_api_key
)
self.llm.check_access()

@ -0,0 +1,86 @@
from typing import Dict, Optional
import logging
from celery import Task
from langchain.agents.agent import AgentExecutor
from langchain.callbacks.manager import CallbackManager
from langchain.chains.conversation.memory import ConversationBufferMemory
from langchain.memory.chat_memory import BaseChatMemory
from swarms.tools.main import BaseToolSet, ToolsFactory
from .AgentBuilder import AgentBuilder
from .callback import EVALCallbackHandler, ExecutionTracingCallbackHandler
CallbackManager.set_handler(handler=EVALCallbackHandler())
class AgentManager:
def __init__(self, toolsets: list[BaseToolSet] = []):
if not isinstance(toolsets, list):
raise TypeError("Toolsets must be a list")
self.toolsets: list[BaseToolSet] = toolsets
self.memories: Dict[str, BaseChatMemory] = {}
self.executors: Dict[str, AgentExecutor] = {}
def create_memory(self) -> BaseChatMemory:
return ConversationBufferMemory(memory_key="chat_history", return_messages=True)
def get_or_create_memory(self, session: str) -> BaseChatMemory:
if not isinstance(session, str):
raise TypeError("Session must be a string")
if not session:
raise ValueError("Session is empty")
if not (session in self.memories):
self.memories[session] = self.create_memory()
return self.memories[session]
def create_executor(self, session: str, execution: Optional[Task] = None) -> AgentExecutor:
try:
builder = AgentBuilder(self.toolsets)
builder.build_parser()
callbacks = []
eval_callback = EVALCallbackHandler()
eval_callback.set_parser(builder.get_parser())
callbacks.append(eval_callback)
if execution:
execution_callback = ExecutionTracingCallbackHandler(execution)
execution_callback.set_parser(builder.get_parser())
callbacks.append(execution_callback)
callback_manager = CallbackManager(callbacks)
builder.build_llm(callback_manager)
builder.build_global_tools()
memory: BaseChatMemory = self.get_or_create_memory(session)
tools = [
*builder.get_global_tools(),
*ToolsFactory.create_per_session_tools(
self.toolsets,
get_session=lambda: (session, self.executors[session]),
),
]
for tool in tools:
tool.callback_manager = callback_manager
executor = AgentExecutor.from_agent_and_tools(
agent=builder.get_agent(),
tools=tools,
memory=memory,
callback_manager=callback_manager,
verbose=True,
)
self.executors[session] = executor
return executor
except Exception as e:
logging.error(f"Error while creating executor: {str(e)}")
raise e
@staticmethod
def create(toolsets: list[BaseToolSet]) -> "AgentManager":
if not isinstance(toolsets, list):
raise TypeError("Toolsets must be a list")
return AgentManager(toolsets=toolsets)

@ -1,82 +0,0 @@
from typing import Dict, Optional
# from celery import Task
from langchain.agents.agent import AgentExecutor
from langchain.callbacks.manager import CallbackManager
# from langchain.callbacks.base import set_handler
from langchain.chains.conversation.memory import ConversationBufferMemory
from langchain.memory.chat_memory import BaseChatMemory
from swarms.tools.main import BaseToolSet, ToolsFactory
from .builder import AgentBuilder
from .callback import EVALCallbackHandler, ExecutionTracingCallbackHandler
CallbackManager.set_handler(handler=EVALCallbackHandler())
class AgentManager:
def __init__(
self,
toolsets: list[BaseToolSet] = [],
):
self.toolsets: list[BaseToolSet] = toolsets
self.memories: Dict[str, BaseChatMemory] = {}
self.executors: Dict[str, AgentExecutor] = {}
def create_memory(self) -> BaseChatMemory:
return ConversationBufferMemory(memory_key="chat_history", return_messages=True)
def get_or_create_memory(self, session: str) -> BaseChatMemory:
if not (session in self.memories):
self.memories[session] = self.create_memory()
return self.memories[session]
def create_executor(
self, session: str, execution: Optional[Task] = None
) -> AgentExecutor:
builder = AgentBuilder(self.toolsets)
builder.build_parser()
callbacks = []
eval_callback = EVALCallbackHandler()
eval_callback.set_parser(builder.get_parser())
callbacks.append(eval_callback)
if execution:
execution_callback = ExecutionTracingCallbackHandler(execution)
execution_callback.set_parser(builder.get_parser())
callbacks.append(execution_callback)
callback_manager = CallbackManager(callbacks)
builder.build_llm(callback_manager)
builder.build_global_tools()
memory: BaseChatMemory = self.get_or_create_memory(session)
tools = [
*builder.get_global_tools(),
*ToolsFactory.create_per_session_tools(
self.toolsets,
get_session=lambda: (session, self.executors[session]),
),
]
for tool in tools:
tool.callback_manager = callback_manager
executor = AgentExecutor.from_agent_and_tools(
agent=builder.get_agent(),
tools=tools,
memory=memory,
callback_manager=callback_manager,
verbose=True,
)
self.executors[session] = executor
return executor
@staticmethod
def create(toolsets: list[BaseToolSet]) -> "AgentManager":
return AgentManager(
toolsets=toolsets,
)

@ -1,18 +1,27 @@
import os
import re
import logging
from pathlib import Path
from typing import Dict, List
from swarms.agents.utils.manager import AgentManager
from swarms.agents.utils.AgentManager import AgentManager
from swarms.utils.main import BaseHandler, FileHandler, FileType
from swarms.tools.main import CsvToDataframe, ExitConversation, RequestsGet, CodeEditor, Terminal
from swarms.tools.main import BaseToolSet
from swarms.utils.main import StaticUploader
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
BASE_DIR = Path(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.chdir(BASE_DIR / os.environ["PLAYGROUND_DIR"])
class UltraNode:
class WorkerUltraNode:
def __init__(self, objective: str):
if not isinstance(objective, str):
raise TypeError("Objective must be a string")
if not objective:
raise ValueError("Objective cannot be empty")
toolsets: List[BaseToolSet] = [
Terminal(),
CodeEditor(),
@ -21,7 +30,7 @@ class UltraNode:
]
handlers: Dict[FileType, BaseHandler] = {FileType.DATAFRAME: CsvToDataframe()}
if os.environ["USE_GPU"]:
if os.environ.get("USE_GPU", False):
import torch
from swarms.tools.main import ImageCaptioning
from swarms.tools.main import ImageEditing, InstructPix2Pix, Text2Image, VisualQuestionAnswering
@ -37,14 +46,21 @@ class UltraNode:
)
handlers[FileType.IMAGE] = ImageCaptioning("cuda")
self.agent_manager = AgentManager.create(toolsets=toolsets)
self.file_handler = FileHandler(handlers=handlers, path=BASE_DIR)
self.uploader = StaticUploader.from_settings(
path=BASE_DIR / "static", endpoint="static"
)
try:
self.agent_manager = AgentManager.create(toolsets=toolsets)
self.file_handler = FileHandler(handlers=handlers, path=BASE_DIR)
self.uploader = StaticUploader.from_settings(
path=BASE_DIR / "static", endpoint="static"
)
self.session = self.agent_manager.create_executor(objective)
self.session = self.agent_manager.create_executor(objective)
except Exception as e:
logging.error(f"Error while initializing WorkerUltraNode: {str(e)}")
raise e
def execute_task(self):
# Now the prompt is not needed as an argument
@ -53,6 +69,7 @@ class UltraNode:
try:
res = self.session({"input": promptedQuery})
except Exception as e:
logging.error(f"Error while executing task: {str(e)}")
return {"answer": str(e), "files": []}
files = re.findall(r"\[file://\S*\]", res["output"])
@ -64,9 +81,15 @@ class UltraNode:
}
def execute(self):
# The prompt is not needed here either
return self.execute_task()
try:
# The prompt is not needed here either
return self.execute_task()
except Exception as e:
logging.error(f"Error while executing: {str(e)}")
raise e
# from worker_node import UltraNode

Loading…
Cancel
Save