parent
4fbd2281f4
commit
d572cafce6
@ -0,0 +1,41 @@
|
||||
from swarms import Anthropic, Agent, SequentialWorkflow
|
||||
|
||||
|
||||
# Initialize the language model agent (e.g., GPT-3)
|
||||
|
||||
llm = Anthropic()
|
||||
|
||||
|
||||
# Initialize agents for individual tasks
|
||||
|
||||
agent1 = Agent(
|
||||
agent_name="Blog generator", llm=llm, max_loops=1, dashboard=False
|
||||
)
|
||||
|
||||
agent2 = Agent(
|
||||
agent_name="summarizer", llm=llm, max_loops=1, dashboard=False
|
||||
)
|
||||
|
||||
|
||||
# Create the Sequential workflow
|
||||
|
||||
workflow = SequentialWorkflow(
|
||||
max_loops=1, objective="Create a full blog and then summarize it"
|
||||
)
|
||||
|
||||
|
||||
# Add tasks to the workflow
|
||||
|
||||
workflow.add(
|
||||
"Generate a 10,000 word blog on health and wellness.", agent1
|
||||
) # this task will be executed task,
|
||||
|
||||
workflow.add(
|
||||
"Summarize the generated blog", agent2
|
||||
) # then the next agent will accomplish this task
|
||||
|
||||
|
||||
# Run the workflow
|
||||
|
||||
out = workflow.run()
|
||||
print(f"{out}")
|
@ -1,203 +1,134 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from termcolor import colored
|
||||
|
||||
# from swarms.utils.logger import logger
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional
|
||||
from swarms.structs.agent import Agent
|
||||
from swarms.structs.conversation import Conversation
|
||||
from swarms.structs.task import Task
|
||||
from swarms.utils.loguru_logger import logger
|
||||
from swarms.utils.try_except_wrapper import try_except_wrapper
|
||||
|
||||
|
||||
# SequentialWorkflow class definition using dataclasses
|
||||
@dataclass
|
||||
class SequentialWorkflow:
|
||||
"""
|
||||
SequentialWorkflow class for running a sequence of task_pool using N number of autonomous agents.
|
||||
|
||||
Args:
|
||||
max_loops (int): The maximum number of times to run the workflow.
|
||||
dashboard (bool): Whether to display the dashboard for the workflow.
|
||||
|
||||
|
||||
Attributes:
|
||||
task_pool (List[Task]): The list of task_pool to execute.
|
||||
max_loops (int): The maximum number of times to run the workflow.
|
||||
dashboard (bool): Whether to display the dashboard for the workflow.
|
||||
|
||||
|
||||
Examples:
|
||||
>>> from swarms.models import OpenAIChat
|
||||
>>> from swarms.structs import SequentialWorkflow
|
||||
>>> llm = OpenAIChat(openai_api_key="")
|
||||
>>> workflow = SequentialWorkflow(max_loops=1)
|
||||
>>> workflow.add("What's the weather in miami", llm)
|
||||
>>> workflow.add("Create a report on these metrics", llm)
|
||||
>>> workflow.run()
|
||||
>>> workflow.task_pool
|
||||
|
||||
"""
|
||||
|
||||
name: str = None
|
||||
name: str = "Sequential Workflow"
|
||||
description: str = None
|
||||
task_pool: List[Task] = None
|
||||
objective: str = None
|
||||
max_loops: int = 1
|
||||
autosave: bool = False
|
||||
saved_state_filepath: Optional[str] = "sequential_workflow_state.json"
|
||||
restore_state_filepath: Optional[str] = None
|
||||
dashboard: bool = False
|
||||
agents: List[Agent] = None
|
||||
agent_pool: List[Agent] = field(default_factory=list)
|
||||
# task_pool: List[str] = field(
|
||||
# default_factory=list
|
||||
# ) # List to store tasks
|
||||
|
||||
def __post_init__(self):
|
||||
self.conversation = Conversation(
|
||||
system_prompt=f"Objective: {self.description}",
|
||||
time_enabled=True,
|
||||
autosave=True,
|
||||
)
|
||||
|
||||
# Logging
|
||||
logger.info("Number of agents activated:")
|
||||
if self.agents:
|
||||
logger.info(f"Agents: {len(self.agents)}")
|
||||
else:
|
||||
logger.info("No agents activated.")
|
||||
|
||||
if self.task_pool:
|
||||
logger.info(f"Task Pool Size: {len(self.task_pool)}")
|
||||
else:
|
||||
logger.info("Task Pool is empty.")
|
||||
|
||||
def add(
|
||||
self,
|
||||
task: Optional[Task] = None,
|
||||
tasks: Optional[List[Task]] = None,
|
||||
*args,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""
|
||||
Add a task to the workflow.
|
||||
|
||||
Args:
|
||||
agent (Union[Callable, Agent]): The model or agent to execute the task.
|
||||
task (str): The task description or the initial input for the Agent.
|
||||
|
||||
*args: Additional arguments to pass to the task execution.
|
||||
**kwargs: Additional keyword arguments to pass to the task execution.
|
||||
"""
|
||||
for agent in self.agents:
|
||||
out = agent(str(self.description))
|
||||
self.conversation.add(agent.agent_name, out)
|
||||
prompt = self.conversation.return_history_as_string()
|
||||
out = agent(prompt)
|
||||
|
||||
return out
|
||||
# If objective exists then set it
|
||||
if self.objective is not None:
|
||||
self.conversation.system_prompt = self.objective
|
||||
|
||||
def reset_workflow(self) -> None:
|
||||
"""Resets the workflow by clearing the results of each task."""
|
||||
try:
|
||||
for task in self.task_pool:
|
||||
task.result = None
|
||||
logger.info(
|
||||
f"[INFO][SequentialWorkflow] Reset task {task} in"
|
||||
" workflow"
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
colored(f"Error resetting workflow: {error}", "red"),
|
||||
)
|
||||
def workflow_bootup(self):
|
||||
logger.info(f"{self.name} is activating...")
|
||||
|
||||
def get_task_results(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Returns the results of each task in the workflow.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: The results of each task in the workflow
|
||||
"""
|
||||
try:
|
||||
return {
|
||||
task.description: task.result for task in self.task_pool
|
||||
}
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
colored(f"Error getting task results: {error}", "red"),
|
||||
)
|
||||
for agent in self.agent_pool:
|
||||
logger.info(f"Agent {agent.agent_name} Activated")
|
||||
|
||||
def remove_task(self, task: Task) -> None:
|
||||
"""Remove task_pool from sequential workflow"""
|
||||
try:
|
||||
self.task_pool.remove(task)
|
||||
logger.info(
|
||||
f"[INFO][SequentialWorkflow] Removed task {task} from"
|
||||
" workflow"
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
colored(
|
||||
f"Error removing task from workflow: {error}",
|
||||
"red",
|
||||
),
|
||||
@try_except_wrapper
|
||||
def add(self, task: str, agent: Agent, *args, **kwargs):
|
||||
self.agent_pool.append(agent)
|
||||
# self.task_pool.append(
|
||||
# task
|
||||
# ) # Store tasks corresponding to each agent
|
||||
|
||||
return self.conversation.add(
|
||||
role=agent.agent_name, content=task, *args, **kwargs
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
Run the workflow.
|
||||
def reset_workflow(self) -> None:
|
||||
self.conversation = {}
|
||||
|
||||
Raises:
|
||||
ValueError: If an Agent instance is used as a task and the 'task' argument is not provided.
|
||||
# @try_except_wrapper
|
||||
# WITH TASK POOL
|
||||
# def run(self):
|
||||
# if not self.agent_pool:
|
||||
# raise ValueError("No agents have been added to the workflow.")
|
||||
|
||||
"""
|
||||
self.workflow_bootup()
|
||||
loops = 0
|
||||
while loops < self.max_loops:
|
||||
for i, agent in enumerate(self.agents):
|
||||
logger.info(f"Agent {i+1} is executing the task.")
|
||||
out = agent(self.description)
|
||||
self.conversation.add(agent.agent_name, str(out))
|
||||
prompt = self.conversation.return_history_as_string()
|
||||
print(prompt)
|
||||
print("Next agent...........")
|
||||
out = agent(prompt)
|
||||
|
||||
return out
|
||||
# try:
|
||||
# self.workflow_bootup()
|
||||
# loops = 0
|
||||
# prompt = None # Initialize prompt to None; will be updated with the output of each agent
|
||||
# while loops < self.max_loops:
|
||||
# for i in range(len(self.task_pool)):
|
||||
# task = self.task_pool[i]
|
||||
# # Check if the current task can be executed
|
||||
# if task.result is None:
|
||||
# # Get the inputs for the current task
|
||||
# task.context(task)
|
||||
|
||||
# result = task.execute()
|
||||
|
||||
# # Pass the inputs to the next task
|
||||
# if i < len(self.task_pool) - 1:
|
||||
# next_task = self.task_pool[i + 1]
|
||||
# next_task.description = result
|
||||
|
||||
# # Execute the current task
|
||||
# task.execute()
|
||||
|
||||
# # Autosave the workflow state
|
||||
# if self.autosave:
|
||||
# self.save_workflow_state(
|
||||
# "sequential_workflow_state.json"
|
||||
# for i, agent in enumerate(self.agent_pool):
|
||||
# task = (
|
||||
# self.task_pool[i] if prompt is None else prompt
|
||||
# ) # Use initial task or the output from the previous agent
|
||||
# logger.info(
|
||||
# f"Agent: {agent.agent_name} {i+1} is executing the task"
|
||||
# )
|
||||
|
||||
# self.workflow_shutdown()
|
||||
# loops += 1
|
||||
# except Exception as e:
|
||||
# logger.info("\n")
|
||||
# output = agent.run(task)
|
||||
# if output is None:
|
||||
# logger.error(
|
||||
# colored(
|
||||
# (
|
||||
# "Error initializing the Sequential workflow:"
|
||||
# f" {e} try optimizing your inputs like the"
|
||||
# " agent class and task description"
|
||||
# ),
|
||||
# "red",
|
||||
# attrs=["bold", "underline"],
|
||||
# )
|
||||
# f"Agent {i+1} returned None for task: {task}"
|
||||
# )
|
||||
# raise ValueError(f"Agent {i+1} returned None.")
|
||||
# self.conversation.add(agent.agent_name, output)
|
||||
# prompt = output # Update prompt with current agent's output to pass to the next agent
|
||||
# logger.info(f"Prompt: {prompt}")
|
||||
# loops += 1
|
||||
# return self.conversation.return_history_as_string()
|
||||
@try_except_wrapper
|
||||
def run(self):
|
||||
if not self.agent_pool:
|
||||
raise ValueError("No agents have been added to the workflow.")
|
||||
|
||||
self.workflow_bootup()
|
||||
loops = 0
|
||||
while loops < self.max_loops:
|
||||
previous_output = None # Initialize to None; will hold the output of the previous agent
|
||||
for i, agent in enumerate(self.agent_pool):
|
||||
# Fetch the last task specific to this agent from the conversation history
|
||||
tasks_for_agent = [
|
||||
msg["content"]
|
||||
for msg in self.conversation.conversation_history
|
||||
if msg["role"] == agent.agent_name
|
||||
]
|
||||
task = tasks_for_agent[-1] if tasks_for_agent else None
|
||||
|
||||
if task is None and previous_output is not None:
|
||||
# If no specific task for this agent, use the output from the previous agent
|
||||
task = previous_output
|
||||
|
||||
if task is None:
|
||||
# If no initial task is found, and there's no previous output, log error and skip this agent
|
||||
logger.error(
|
||||
f"No initial task found for agent {agent.agent_name}, and no previous output to use."
|
||||
)
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
f" \n Agent {i+1} ({agent.agent_name}) is executing the task: {task} \n"
|
||||
)
|
||||
|
||||
# Space the log
|
||||
|
||||
output = agent.run(task)
|
||||
if output is None:
|
||||
logger.error(
|
||||
f"Agent {agent.agent_name} returned None for task: {task}"
|
||||
)
|
||||
raise ValueError(
|
||||
f"Agent {agent.agent_name} returned None."
|
||||
)
|
||||
|
||||
# Update the conversation history with the new output using agent's role
|
||||
self.conversation.add(
|
||||
role=agent.agent_name, content=output
|
||||
)
|
||||
previous_output = output # Update the previous_output to pass to the next agent
|
||||
|
||||
loops += 1
|
||||
return self.conversation.return_history_as_string()
|
||||
|
Loading…
Reference in new issue