diff --git a/api/container.py b/api/container.py index 1ffeea25..251c697a 100644 --- a/api/container.py +++ b/api/container.py @@ -5,7 +5,7 @@ from typing import Dict, List from fastapi.templating import Jinja2Templates -from swarms.agents.utils.manager import AgentManager +from swarms.agents.utils.AgentManager import AgentManager from swarms.utils.main import BaseHandler, FileHandler, FileType from swarms.tools.main import CsvToDataframe, ExitConversation, RequestsGet, CodeEditor, Terminal diff --git a/swarms/__init__.py b/swarms/__init__.py index 1a936d07..44a0bdb4 100644 --- a/swarms/__init__.py +++ b/swarms/__init__.py @@ -1,3 +1,3 @@ # from swarms import Swarms, swarm from swarms.swarms import Swarms, swarm -from swarms.agents import worker_node, UltraNode \ No newline at end of file +from swarms.agents import worker_node, WorkerUltraNode \ No newline at end of file diff --git a/swarms/agents/utils/builder.py b/swarms/agents/utils/AgentBuilder.py similarity index 95% rename from swarms/agents/utils/builder.py rename to swarms/agents/utils/AgentBuilder.py index 5a34275e..2b11930b 100644 --- a/swarms/agents/utils/builder.py +++ b/swarms/agents/utils/AgentBuilder.py @@ -11,7 +11,7 @@ from langchain.callbacks.base import BaseCallbackManager from .chat_agent import ConversationalChatAgent from .llm import ChatOpenAI -from .parser import EvalOutputParser +from .EvalOutputParser import EvalOutputParser class AgentBuilder: @@ -21,9 +21,9 @@ class AgentBuilder: self.global_tools: list = None self.toolsets = toolsets - def build_llm(self, callback_manager: BaseCallbackManager = None): + def build_llm(self, callback_manager: BaseCallbackManager = None, openai_api_key: str = None): self.llm = ChatOpenAI( - temperature=0, callback_manager=callback_manager, verbose=True + temperature=0, callback_manager=callback_manager, verbose=True, openai_api_key=openai_api_key ) self.llm.check_access() diff --git a/swarms/agents/utils/AgentManager.py b/swarms/agents/utils/AgentManager.py new file mode 100644 index 00000000..4582590a --- /dev/null +++ b/swarms/agents/utils/AgentManager.py @@ -0,0 +1,86 @@ +from typing import Dict, Optional +import logging + +from celery import Task + +from langchain.agents.agent import AgentExecutor +from langchain.callbacks.manager import CallbackManager +from langchain.chains.conversation.memory import ConversationBufferMemory +from langchain.memory.chat_memory import BaseChatMemory + +from swarms.tools.main import BaseToolSet, ToolsFactory +from .AgentBuilder import AgentBuilder +from .callback import EVALCallbackHandler, ExecutionTracingCallbackHandler + + +CallbackManager.set_handler(handler=EVALCallbackHandler()) + +class AgentManager: + def __init__(self, toolsets: list[BaseToolSet] = []): + if not isinstance(toolsets, list): + raise TypeError("Toolsets must be a list") + self.toolsets: list[BaseToolSet] = toolsets + self.memories: Dict[str, BaseChatMemory] = {} + self.executors: Dict[str, AgentExecutor] = {} + + def create_memory(self) -> BaseChatMemory: + return ConversationBufferMemory(memory_key="chat_history", return_messages=True) + + def get_or_create_memory(self, session: str) -> BaseChatMemory: + if not isinstance(session, str): + raise TypeError("Session must be a string") + if not session: + raise ValueError("Session is empty") + if not (session in self.memories): + self.memories[session] = self.create_memory() + return self.memories[session] + + def create_executor(self, session: str, execution: Optional[Task] = None) -> AgentExecutor: + try: + builder = AgentBuilder(self.toolsets) + builder.build_parser() + + callbacks = [] + eval_callback = EVALCallbackHandler() + eval_callback.set_parser(builder.get_parser()) + callbacks.append(eval_callback) + if execution: + execution_callback = ExecutionTracingCallbackHandler(execution) + execution_callback.set_parser(builder.get_parser()) + callbacks.append(execution_callback) + + callback_manager = CallbackManager(callbacks) + + builder.build_llm(callback_manager) + builder.build_global_tools() + + memory: BaseChatMemory = self.get_or_create_memory(session) + tools = [ + *builder.get_global_tools(), + *ToolsFactory.create_per_session_tools( + self.toolsets, + get_session=lambda: (session, self.executors[session]), + ), + ] + + for tool in tools: + tool.callback_manager = callback_manager + + executor = AgentExecutor.from_agent_and_tools( + agent=builder.get_agent(), + tools=tools, + memory=memory, + callback_manager=callback_manager, + verbose=True, + ) + self.executors[session] = executor + return executor + except Exception as e: + logging.error(f"Error while creating executor: {str(e)}") + raise e + + @staticmethod + def create(toolsets: list[BaseToolSet]) -> "AgentManager": + if not isinstance(toolsets, list): + raise TypeError("Toolsets must be a list") + return AgentManager(toolsets=toolsets) \ No newline at end of file diff --git a/swarms/agents/utils/parser.py b/swarms/agents/utils/EvalOutputParser.py similarity index 100% rename from swarms/agents/utils/parser.py rename to swarms/agents/utils/EvalOutputParser.py diff --git a/swarms/agents/utils/manager.py b/swarms/agents/utils/manager.py deleted file mode 100644 index 72f2f978..00000000 --- a/swarms/agents/utils/manager.py +++ /dev/null @@ -1,82 +0,0 @@ -from typing import Dict, Optional -# from celery import Task - -from langchain.agents.agent import AgentExecutor -from langchain.callbacks.manager import CallbackManager -# from langchain.callbacks.base import set_handler -from langchain.chains.conversation.memory import ConversationBufferMemory -from langchain.memory.chat_memory import BaseChatMemory - -from swarms.tools.main import BaseToolSet, ToolsFactory - -from .builder import AgentBuilder -from .callback import EVALCallbackHandler, ExecutionTracingCallbackHandler - - -CallbackManager.set_handler(handler=EVALCallbackHandler()) - - -class AgentManager: - def __init__( - self, - toolsets: list[BaseToolSet] = [], - ): - self.toolsets: list[BaseToolSet] = toolsets - self.memories: Dict[str, BaseChatMemory] = {} - self.executors: Dict[str, AgentExecutor] = {} - - def create_memory(self) -> BaseChatMemory: - return ConversationBufferMemory(memory_key="chat_history", return_messages=True) - - def get_or_create_memory(self, session: str) -> BaseChatMemory: - if not (session in self.memories): - self.memories[session] = self.create_memory() - return self.memories[session] - - def create_executor( - self, session: str, execution: Optional[Task] = None - ) -> AgentExecutor: - builder = AgentBuilder(self.toolsets) - builder.build_parser() - - callbacks = [] - eval_callback = EVALCallbackHandler() - eval_callback.set_parser(builder.get_parser()) - callbacks.append(eval_callback) - if execution: - execution_callback = ExecutionTracingCallbackHandler(execution) - execution_callback.set_parser(builder.get_parser()) - callbacks.append(execution_callback) - - callback_manager = CallbackManager(callbacks) - - builder.build_llm(callback_manager) - builder.build_global_tools() - - memory: BaseChatMemory = self.get_or_create_memory(session) - tools = [ - *builder.get_global_tools(), - *ToolsFactory.create_per_session_tools( - self.toolsets, - get_session=lambda: (session, self.executors[session]), - ), - ] - - for tool in tools: - tool.callback_manager = callback_manager - - executor = AgentExecutor.from_agent_and_tools( - agent=builder.get_agent(), - tools=tools, - memory=memory, - callback_manager=callback_manager, - verbose=True, - ) - self.executors[session] = executor - return executor - - @staticmethod - def create(toolsets: list[BaseToolSet]) -> "AgentManager": - return AgentManager( - toolsets=toolsets, - ) \ No newline at end of file diff --git a/swarms/agents/workers/worker_ultranode.py b/swarms/agents/workers/worker_ultranode.py index 04108a84..6d043c28 100644 --- a/swarms/agents/workers/worker_ultranode.py +++ b/swarms/agents/workers/worker_ultranode.py @@ -1,18 +1,27 @@ import os +import re +import logging from pathlib import Path from typing import Dict, List -from swarms.agents.utils.manager import AgentManager +from swarms.agents.utils.AgentManager import AgentManager from swarms.utils.main import BaseHandler, FileHandler, FileType from swarms.tools.main import CsvToDataframe, ExitConversation, RequestsGet, CodeEditor, Terminal from swarms.tools.main import BaseToolSet from swarms.utils.main import StaticUploader +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + BASE_DIR = Path(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) os.chdir(BASE_DIR / os.environ["PLAYGROUND_DIR"]) -class UltraNode: +class WorkerUltraNode: def __init__(self, objective: str): + if not isinstance(objective, str): + raise TypeError("Objective must be a string") + if not objective: + raise ValueError("Objective cannot be empty") + toolsets: List[BaseToolSet] = [ Terminal(), CodeEditor(), @@ -21,7 +30,7 @@ class UltraNode: ] handlers: Dict[FileType, BaseHandler] = {FileType.DATAFRAME: CsvToDataframe()} - if os.environ["USE_GPU"]: + if os.environ.get("USE_GPU", False): import torch from swarms.tools.main import ImageCaptioning from swarms.tools.main import ImageEditing, InstructPix2Pix, Text2Image, VisualQuestionAnswering @@ -37,14 +46,21 @@ class UltraNode: ) handlers[FileType.IMAGE] = ImageCaptioning("cuda") - self.agent_manager = AgentManager.create(toolsets=toolsets) - self.file_handler = FileHandler(handlers=handlers, path=BASE_DIR) - self.uploader = StaticUploader.from_settings( - path=BASE_DIR / "static", endpoint="static" - ) + try: + + self.agent_manager = AgentManager.create(toolsets=toolsets) + self.file_handler = FileHandler(handlers=handlers, path=BASE_DIR) + self.uploader = StaticUploader.from_settings( + path=BASE_DIR / "static", endpoint="static" + ) - self.session = self.agent_manager.create_executor(objective) + + self.session = self.agent_manager.create_executor(objective) + + except Exception as e: + logging.error(f"Error while initializing WorkerUltraNode: {str(e)}") + raise e def execute_task(self): # Now the prompt is not needed as an argument @@ -53,6 +69,7 @@ class UltraNode: try: res = self.session({"input": promptedQuery}) except Exception as e: + logging.error(f"Error while executing task: {str(e)}") return {"answer": str(e), "files": []} files = re.findall(r"\[file://\S*\]", res["output"]) @@ -63,10 +80,16 @@ class UltraNode: "files": [self.uploader.upload(file) for file in files], } + def execute(self): - # The prompt is not needed here either - return self.execute_task() + try: + + # The prompt is not needed here either + return self.execute_task() + except Exception as e: + logging.error(f"Error while executing: {str(e)}") + raise e # from worker_node import UltraNode