From a01711fd231c3922e0e2aec281a29464d64ec130 Mon Sep 17 00:00:00 2001 From: Kye Date: Thu, 14 Dec 2023 11:40:06 -0800 Subject: [PATCH] [SEQUENTIALWORKFLOW][FIX] --- gemini.py | 4 +- swarms/memory/chroma_db.py | 17 ++- swarms/structs/agent.py | 8 -- swarms/structs/sequential_workflow.py | 74 +----------- swarms/structs/task.py | 168 +++++++++----------------- tests/structs/test_task.py | 33 +++++ 6 files changed, 106 insertions(+), 198 deletions(-) diff --git a/gemini.py b/gemini.py index 42fa4e74..79058efb 100644 --- a/gemini.py +++ b/gemini.py @@ -7,7 +7,9 @@ load_dotenv() api_key = os.environ["GEMINI_API_KEY"] # Initialize the model -model = Gemini(gemini_api_key=api_key) +model = Gemini( + gemini_api_key="AIzaSyCUMFvG3TwwuZIniSpvDeP2AW5USyJUgJM" +) # Establish the prompt and image task = "What is your name" diff --git a/swarms/memory/chroma_db.py b/swarms/memory/chroma_db.py index 189d6ec2..8e200974 100644 --- a/swarms/memory/chroma_db.py +++ b/swarms/memory/chroma_db.py @@ -62,7 +62,7 @@ class ChromaDB: top_results_num: int = 3, limit_tokens: Optional[int] = 1000, *args, - **kwargs + **kwargs, ): self.metric = metric self.RESULTS_STORE_NAME = RESULTS_STORE_NAME @@ -93,7 +93,9 @@ class ChromaDB: embedding_function=embedding_function, ) - def add(self, task: Dict, result: str, result_id: str, *args, **kwargs): + def add( + self, task: Dict, result: str, result_id: str, *args, **kwargs + ): """Adds a result to the ChromaDB collection Args: @@ -140,19 +142,14 @@ class ChromaDB: "result": result, }, *args, - **kwargs + **kwargs, ) except Exception as error: print( colored(f"Error adding to ChromaDB: {error}", "red") ) - def query( - self, - query: str, - *args, - **kwargs - ) -> List[dict]: + def query(self, query: str, *args, **kwargs) -> List[dict]: """Queries the ChromaDB collection with a query for the top results Args: @@ -171,7 +168,7 @@ class ChromaDB: n_results=min(self.top_results_num, count), include=["metadatas"], *args, - **kwargs + **kwargs, ) out = [item["task"] for item in results["metadatas"][0]] out = limit_tokens_from_string( diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 5ee2da6a..9d48791e 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -264,14 +264,6 @@ class Agent: if preset_stopping_token: self.stopping_token = "" - # If memory then add the json to the memory vector database - if memory: - # Add all of the state to the memory - self.add_message_to_memory_db( - {"message": self.state_to_str()}, - {"agent_id": self.id}, - ) - # If tools exist then add the tool docs usage to the sop if self.tools: self.sop_list.append( diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index 49c529cb..41192a6f 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -6,74 +6,7 @@ from typing import Any, Callable, Dict, List, Optional, Union from termcolor import colored from swarms.structs.agent import Agent - - -# Define a generic Task that can handle different types of callable objects -@dataclass -class Task: - """ - Task class for running a task in a sequential workflow. - - - Args: - description (str): The description of the task. - agent (Union[Callable, Agent]): The model or agent to execute the task. - args (List[Any]): Additional arguments to pass to the task execution. - kwargs (Dict[str, Any]): Additional keyword arguments to pass to the task execution. - result (Any): The result of the task execution. - history (List[Any]): The history of the task execution. - - Methods: - execute: Execute the task. - - - Examples: - >>> from swarms.structs import Task, Agent - >>> from swarms.models import OpenAIChat - >>> agent = Agent(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False) - >>> task = Task(description="What's the weather in miami", agent=agent) - >>> task.execute() - >>> task.result - - """ - - description: str - agent: Union[Callable, Agent] - args: List[Any] = field(default_factory=list) - kwargs: Dict[str, Any] = field(default_factory=dict) - result: Any = None - history: List[Any] = field(default_factory=list) - # logger = logging.getLogger(__name__) - - def execute(self): - """ - Execute the task. - - Raises: - ValueError: If a Agent instance is used as a task and the 'task' argument is not provided. - """ - if isinstance(self.agent, Agent): - # Add a prompt to notify the Agent of the sequential workflow - if "prompt" in self.kwargs: - self.kwargs["prompt"] += ( - f"\n\nPrevious output: {self.result}" - if self.result - else "" - ) - else: - self.kwargs["prompt"] = ( - f"Main task: {self.description}" - + ( - f"\n\nPrevious output: {self.result}" - if self.result - else "" - ) - ) - self.result = self.agent.run(*self.args, **self.kwargs) - else: - self.result = self.agent(*self.args, **self.kwargs) - - self.history.append(self.result) +from swarms.structs.task import Task # SequentialWorkflow class definition using dataclasses @@ -361,7 +294,10 @@ class SequentialWorkflow: ) def workflow_bootup(self, **kwargs) -> None: - """Bootup the workflow.""" + """ + Workflow bootup. + + """ print( colored( """ diff --git a/swarms/structs/task.py b/swarms/structs/task.py index 8c6e6adc..caf812ae 100644 --- a/swarms/structs/task.py +++ b/swarms/structs/task.py @@ -1,132 +1,80 @@ -from dataclass import dataclass, field +from dataclasses import dataclass, field +from typing import ( + Any, + Callable, + Dict, + List, + Optional, + Sequence, + Union, +) + from swarms.structs.agent import Agent -from typing import Optional -from typing import List, Dict, Any, Sequence +# Define a generic Task that can handle different types of callable objects @dataclass class Task: """ - Task is a unit of work that can be executed by a set of agents. + Task class for running a task in a sequential workflow. - A task is defined by a task name and a set of agents that can execute the task. - The task can also have a set of dependencies, which are the names of other tasks - that must be executed before this task can be executed. Args: - id (str): The name of the task. - description (Optional[str]): A description of the task. - task (str): The name of the task. - result (Any): The result of the task. - agents (Sequence[Agent]): A list of agents that can execute the task. - dependencies (List[str], optional): A list of task names that must be executed before this task can be executed. Defaults to []. - args (List[Any], optional): A list of arguments to pass to the agents. Defaults to field(default_factory=list). - kwargs (List[Any], optional): A list of keyword arguments to pass to the agents. Defaults to field(default_factory=list). + description (str): The description of the task. + agent (Union[Callable, Agent]): The model or agent to execute the task. + args (List[Any]): Additional arguments to pass to the task execution. + kwargs (Dict[str, Any]): Additional keyword arguments to pass to the task execution. + result (Any): The result of the task execution. + history (List[Any]): The history of the task execution. Methods: - execute: Executes the task by passing the results of the parent tasks to the agents. - - Examples: - import os - from swarms.models import OpenAIChat - from swarms.structs import Agent - from swarms.structs.sequential_workflow import SequentialWorkflow - from dotenv import load_dotenv - - load_dotenv() - - # Load the environment variables - api_key = os.getenv("OPENAI_API_KEY") - - - # Initialize the language agent - llm = OpenAIChat( - openai_api_key=api_key, - temperature=0.5, - max_tokens=3000, - ) - - - # Initialize the agent with the language agent - agent1 = Agent(llm=llm, max_loops=1) - - # Create another agent for a different task - agent2 = Agent(llm=llm, max_loops=1) - - # Create the workflow - workflow = SequentialWorkflow(max_loops=1) + execute: Execute the task. - # Add tasks to the workflow - workflow.add( - agent1, "Generate a 10,000 word blog on health and wellness.", - ) - # Suppose the next task takes the output of the first task as input - workflow.add( - agent2, "Summarize the generated blog", - ) - - # Run the workflow - workflow.run() - - # Output the results - for task in workflow.tasks: - print(f"Task: {task.description}, Result: {task.result}") + Examples: + >>> from swarms.structs import Task, Agent + >>> from swarms.models import OpenAIChat + >>> agent = Agent(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False) + >>> task = Task(description="What's the weather in miami", agent=agent) + >>> task.execute() + >>> task.result """ - def __init__( - self, - id: str, - description: Optional[str], - task: str, - result: Any, - agents: Sequence[Agent], - dependencies: List[str] = [], - args: List[Any] = field(default_factory=list), - kwargs: List[Any] = field(default_factory=list), - ): - self.id = id - self.description = description - self.task = task - self.result = result - self.agents = agents - self.dependencies = dependencies - self.results = [] - self.args = args - self.kwargs = kwargs + description: str + agent: Union[Callable, Agent] + args: List[Any] = field(default_factory=list) + kwargs: Dict[str, Any] = field(default_factory=dict) + result: Any = None + history: List[Any] = field(default_factory=list) + # logger = logging.getLogger(__name__) - def execute(self, parent_results: Dict[str, Any]): - """Executes the task by passing the results of the parent tasks to the agents. - - Args: - parent_results (Dict[str, Any]): A dictionary of task names and their results. + def execute(self): + """ + Execute the task. - Examples: + Raises: + ValueError: If a Agent instance is used as a task and the 'task' argument is not provided. """ - args = [parent_results[dep] for dep in self.dependencies] - for agent in self.agents: - if isinstance(agent, Agent): - if "prompt" in self.kwargs: - self.kwargs["prompt"] += ( - f"\n\nPrevious output: {self.results[-1]}" - if self.results + if isinstance(self.agent, Agent): + # Add a prompt to notify the Agent of the sequential workflow + if "prompt" in self.kwargs: + self.kwargs["prompt"] += ( + f"\n\nPrevious output: {self.result}" + if self.result + else "" + ) + else: + self.kwargs["prompt"] = ( + f"Main task: {self.description}" + + ( + f"\n\nPrevious output: {self.result}" + if self.result else "" ) - else: - self.kwargs["prompt"] = ( - f"Main task: {self.description}" - + ( - f"\n\nPrevious output: {self.results[-1]}" - if self.results - else "" - ) - ) - result = agent.run( - self.description, *args, **self.kwargs ) - else: - result = agent(self.description, *args, **self.kwargs) - self.results.append(result) - args = [result] - self.history.append(result) + self.result = self.agent.run(*self.args, **self.kwargs) + else: + self.result = self.agent(*self.args, **self.kwargs) + + self.history.append(self.result) diff --git a/tests/structs/test_task.py b/tests/structs/test_task.py index 5db822d4..85bc1daf 100644 --- a/tests/structs/test_task.py +++ b/tests/structs/test_task.py @@ -108,3 +108,36 @@ def test_task_execute_with_mocked_agents(task, mocker): parent_results = {} task.execute(parent_results) assert len(task.results) == 5 + + +def test_task_creation(): + agent = Agent() + task = Task(id="1", task="Task1", result=None, agents=[agent]) + assert task.id == "1" + assert task.task == "Task1" + assert task.result is None + assert task.agents == [agent] + +def test_task_with_dependencies(): + agent = Agent() + task = Task(id="2", task="Task2", result=None, agents=[agent], dependencies=["Task1"]) + assert task.dependencies == ["Task1"] + +def test_task_with_args(): + agent = Agent() + task = Task(id="3", task="Task3", result=None, agents=[agent], args=["arg1", "arg2"]) + assert task.args == ["arg1", "arg2"] + +def test_task_with_kwargs(): + agent = Agent() + task = Task(id="4", task="Task4", result=None, agents=[agent], kwargs={"kwarg1": "value1"}) + assert task.kwargs == {"kwarg1": "value1"} + +# ... continue creating tests for different scenarios + +# Test execute method +def test_execute(): + agent = Agent() + task = Task(id="5", task="Task5", result=None, agents=[agent]) + # Assuming execute method returns True on successful execution + assert task.execute() == True \ No newline at end of file