cleaning prs

pull/1080/head
Aksh Parekh 4 weeks ago
parent 2d5232f19b
commit 4953b39948

@ -1,88 +0,0 @@
from swarms.structs.agent import Agent
from swarms.structs.sequential_workflow import SequentialWorkflow
def run_workflow_with_streaming_callback(task, streaming_callback):
"""
Run a sequential workflow with two agents and a streaming callback.
Args:
task (str): The task to process through the workflow.
streaming_callback (callable): Function to handle streaming output.
Returns:
The final result from the workflow.
"""
agent1 = Agent(
name="Research Agent",
description="A research agent that can answer questions",
model_name="gpt-4o",
system_prompt=(
"You are a ResearchAgent. Your task is to research and gather "
"information about the given topic. Provide comprehensive research "
"findings and key insights."
),
max_loops=1,
interactive=True,
verbose=True,
)
agent2 = Agent(
name="Analysis Agent",
description="An analysis agent that draws conclusions from research",
model_name="gpt-4o-mini",
system_prompt=(
"You are an AnalysisAgent. Your task is to analyze the research "
"provided by the previous agent and draw meaningful conclusions. "
"Provide detailed analysis and actionable insights."
),
max_loops=1,
interactive=True,
verbose=True,
)
workflow = SequentialWorkflow(
id="research_analysis_workflow",
name="Research Analysis Workflow",
description="A sequential workflow that researches and analyzes topics",
agents=[agent1, agent2],
max_loops=1,
output_type="str",
streaming_callback=streaming_callback,
multi_agent_collab_prompt=True,
)
return workflow.run(task)
if __name__ == "__main__":
# ## REGULAR STREAMING CALLBACK
# def streaming_callback(token):
# print(token, end="", flush=True)
# run_workflow_with_streaming_callback(
# task="What are the latest advancements in AI?",
# streaming_callback=streaming_callback,
# )
# ## CUSTOM BUFFERING STREAMING_CALLBACK BASED ON DEV PREFERED
# buffer = []
# def streaming_callback(token):
# buffer.append(token)
# # Print in bigger chunks (e.g., every 20 tokens or on final flush)
# if len(buffer) >= 20 or token.endswith("\n"):
# print("".join(buffer), end="", flush=True)
# buffer.clear()
# # Optionally, you could add a flush at the end of the run if needed
# run_workflow_with_streaming_callback(
# task="What are the latest advancements in AI?",
# streaming_callback=streaming_callback,
# )
## NO ADDED STREAMING_CALLBACK
run_workflow_with_streaming_callback(
task="What are the latest advancements in AI?",
streaming_callback=None,
)

@ -1,723 +0,0 @@
import json
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, List, Optional, Union
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.telemetry.main import log_agent_data
from swarms.utils.any_to_str import any_to_str
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
from swarms.structs.swarm_id import swarm_id
logger = initialize_logger(log_folder="rearrange")
class AgentRearrange:
def __init__(
self,
id: str = swarm_id(),
name: str = "AgentRearrange",
description: str = "A swarm of agents for rearranging tasks.",
agents: List[Union[Agent, Callable]] = None,
flow: str = None,
max_loops: int = 1,
verbose: bool = True,
memory_system: Any = None,
human_in_the_loop: bool = False,
custom_human_in_the_loop: Optional[
Callable[[str], str]
] = None,
output_type: OutputType = "all",
autosave: bool = True,
rules: str = None,
team_awareness: bool = False,
time_enabled: bool = False,
message_id_on: bool = False,
streaming_callback: Optional[Callable[[str], None]] = None,
*args,
**kwargs,
):
self.name = name
self.description = description
self.id = id
self.agents = {agent.agent_name: agent for agent in agents}
self.flow = flow if flow is not None else ""
self.verbose = verbose
self.max_loops = max_loops if max_loops > 0 else 1
self.memory_system = memory_system
self.human_in_the_loop = human_in_the_loop
self.custom_human_in_the_loop = custom_human_in_the_loop
self.output_type = output_type
self.autosave = autosave
self.time_enabled = time_enabled
self.message_id_on = message_id_on
self.streaming_callback = streaming_callback
self.conversation = Conversation(
name=f"{self.name}-Conversation",
time_enabled=self.time_enabled,
token_count=False,
message_id_on=self.message_id_on,
)
if rules:
self.conversation.add("user", rules)
if team_awareness is True:
# agents_info = get_agents_info(agents=self.agents, team_name=self.name)
# Add sequential flow information if available
sequential_info = self._get_sequential_flow_info()
if sequential_info:
# agents_info += "\n\n" + sequential_info
self.conversation.add("system", sequential_info)
# self.conversation.add("system", agents_info)
self.reliability_check()
def reliability_check(self):
if self.agents is None or len(self.agents) == 0:
raise ValueError("Agents list cannot be None or empty")
if self.max_loops == 0:
raise ValueError("max_loops cannot be 0")
if self.flow is None or self.flow == "":
raise ValueError("flow cannot be None or empty")
if self.output_type is None or self.output_type == "":
raise ValueError("output_type cannot be None or empty")
def set_custom_flow(self, flow: str):
self.flow = flow
logger.info(f"Custom flow set: {flow}")
def add_agent(self, agent: Agent):
"""
Adds an agent to the swarm.
Args:
agent (Agent): The agent to be added.
"""
logger.info(f"Adding agent {agent.agent_name} to the swarm.")
self.agents[agent.agent_name] = agent
def track_history(
self,
agent_name: str,
result: str,
):
self.swarm_history[agent_name].append(result)
def remove_agent(self, agent_name: str):
"""
Removes an agent from the swarm.
Args:
agent_name (str): The name of the agent to be removed.
"""
del self.agents[agent_name]
def add_agents(self, agents: List[Agent]):
"""
Adds multiple agents to the swarm.
Args:
agents (List[Agent]): A list of Agent objects.
"""
for agent in agents:
self.agents[agent.agent_name] = agent
def validate_flow(self):
"""
Validates the flow pattern.
Raises:
ValueError: If the flow pattern is incorrectly formatted or contains duplicate agent names.
Returns:
bool: True if the flow pattern is valid.
"""
if "->" not in self.flow:
raise ValueError(
"Flow must include '->' to denote the direction of the task."
)
agents_in_flow = []
# Arrow
tasks = self.flow.split("->")
# For the task in tasks
for task in tasks:
agent_names = [name.strip() for name in task.split(",")]
# Loop over the agent names
for agent_name in agent_names:
if (
agent_name not in self.agents
and agent_name != "H"
):
raise ValueError(
f"Agent '{agent_name}' is not registered."
)
agents_in_flow.append(agent_name)
# # If the length of the agents does not equal the length of the agents in flow
# if len(set(agents_in_flow)) != len(agents_in_flow):
# raise ValueError(
# "Duplicate agent names in the flow are not allowed."
# )
logger.info(f"Flow: {self.flow} is valid.")
return True
def _get_sequential_awareness(
self, agent_name: str, tasks: List[str]
) -> str:
"""
Determines the sequential awareness information for an agent in a sequential flow.
Args:
agent_name (str): The name of the current agent.
tasks (List[str]): The list of tasks in the flow.
Returns:
str: A string describing the agents ahead and behind in the sequence.
"""
# Find the position of the current agent in the flow
agent_position = None
for i, task in enumerate(tasks):
agent_names = [name.strip() for name in task.split(",")]
if agent_name in agent_names:
agent_position = i
break
if agent_position is None:
return ""
awareness_info = []
# Check if there's an agent before (ahead in the sequence)
if agent_position > 0:
prev_task = tasks[agent_position - 1]
prev_agents = [
name.strip() for name in prev_task.split(",")
]
if (
prev_agents and prev_agents[0] != "H"
): # Skip human agents
awareness_info.append(
f"Agent ahead: {', '.join(prev_agents)}"
)
# Check if there's an agent after (behind in the sequence)
if agent_position < len(tasks) - 1:
next_task = tasks[agent_position + 1]
next_agents = [
name.strip() for name in next_task.split(",")
]
if (
next_agents and next_agents[0] != "H"
): # Skip human agents
awareness_info.append(
f"Agent behind: {', '.join(next_agents)}"
)
if awareness_info:
return (
f"Sequential awareness: {' | '.join(awareness_info)}"
)
return ""
def _get_sequential_flow_info(self) -> str:
"""
Gets information about the overall sequential flow structure.
Returns:
str: A string describing the sequential flow structure.
"""
if not self.flow or "->" not in self.flow:
return ""
tasks = self.flow.split("->")
flow_info = []
for i, task in enumerate(tasks):
agent_names = [name.strip() for name in task.split(",")]
if (
agent_names and agent_names[0] != "H"
): # Skip human agents
position_info = (
f"Step {i+1}: {', '.join(agent_names)}"
)
if i > 0:
prev_task = tasks[i - 1]
prev_agents = [
name.strip() for name in prev_task.split(",")
]
if prev_agents and prev_agents[0] != "H":
position_info += (
f" (follows: {', '.join(prev_agents)})"
)
if i < len(tasks) - 1:
next_task = tasks[i + 1]
next_agents = [
name.strip() for name in next_task.split(",")
]
if next_agents and next_agents[0] != "H":
position_info += (
f" (leads to: {', '.join(next_agents)})"
)
flow_info.append(position_info)
if flow_info:
return "Sequential Flow Structure:\n" + "\n".join(
flow_info
)
return ""
def get_agent_sequential_awareness(self, agent_name: str) -> str:
"""
Gets the sequential awareness information for a specific agent.
Args:
agent_name (str): The name of the agent to get awareness for.
Returns:
str: A string describing the agents ahead and behind in the sequence.
"""
if not self.flow or "->" not in self.flow:
return ""
tasks = self.flow.split("->")
return self._get_sequential_awareness(agent_name, tasks)
def get_sequential_flow_structure(self) -> str:
"""
Gets the overall sequential flow structure information.
Returns:
str: A string describing the complete sequential flow structure.
"""
return self._get_sequential_flow_info()
def _run(
self,
task: str = None,
img: str = None,
custom_tasks: Dict[str, str] = None,
*args,
**kwargs,
):
"""
Runs the swarm to rearrange the tasks.
Args:
task (str, optional): The initial task to be processed. Defaults to None.
img (str, optional): Image input for agents that support it. Defaults to None.
custom_tasks (Dict[str, str], optional): Custom tasks for specific agents. Defaults to None.
output_type (str, optional): Format of the output. Can be:
- "all": String containing all agent responses concatenated
- "final": Only the final agent's response
- "list": List of all agent responses
- "dict": Dict mapping agent names to their responses
Defaults to "final".
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
Union[str, List[str], Dict[str, str]]: The processed output in the specified format
Raises:
ValueError: If flow validation fails
Exception: For any other errors during execution
"""
try:
self.conversation.add("User", task)
if not self.validate_flow():
logger.error("Flow validation failed")
return "Invalid flow configuration."
tasks = self.flow.split("->")
current_task = task
response_dict = {}
logger.info(
f"Starting task execution with {len(tasks)} steps"
)
# # Handle custom tasks
if custom_tasks is not None:
logger.info("Processing custom tasks")
c_agent_name, c_task = next(
iter(custom_tasks.items())
)
position = tasks.index(c_agent_name)
if position > 0:
tasks[position - 1] += "->" + c_task
else:
tasks.insert(position, c_task)
loop_count = 0
while loop_count < self.max_loops:
logger.info(
f"Starting loop {loop_count + 1}/{self.max_loops}"
)
for task_idx, task in enumerate(tasks):
agent_names = [
name.strip() for name in task.split(",")
]
if len(agent_names) > 1:
# Parallel processing
logger.info(
f"Running agents in parallel: {agent_names}"
)
for agent_name in agent_names:
agent = self.agents[agent_name]
# Set agent.streaming_on if no streaming_callback
if self.streaming_callback is not None:
agent.streaming_on = True
result = agent.run(
task=self.conversation.get_str(),
img=img,
*args,
**kwargs,
)
result = any_to_str(result)
# Call streaming callback with the result if provided
if self.streaming_callback:
self.streaming_callback(result)
self.conversation.add(
agent.agent_name, result
)
response_dict[agent_name] = result
logger.debug(
f"Agent {agent_name} output: {result}"
)
",".join(agent_names)
else:
# Sequential processing
logger.info(
f"Running agent sequentially: {agent_names[0]}"
)
agent_name = agent_names[0]
agent = self.agents[agent_name]
# Add sequential awareness information for the agent
awareness_info = (
self._get_sequential_awareness(
agent_name, tasks
)
)
if awareness_info:
self.conversation.add(
"system", awareness_info
)
logger.info(
f"Added sequential awareness for {agent_name}: {awareness_info}"
)
# Set agent.streaming_on if no streaming_callback
if self.streaming_callback is not None:
agent.streaming_on = True
current_task = agent.run(
task=self.conversation.get_str(),
img=img,
*args,
**kwargs,
)
current_task = any_to_str(current_task)
# Call streaming callback with the result if provided
if self.streaming_callback:
self.streaming_callback(current_task)
self.conversation.add(
agent.agent_name, current_task
)
response_dict[agent_name] = current_task
loop_count += 1
logger.info("Task execution completed")
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,
)
except Exception as e:
self._catch_error(e)
def _catch_error(self, e: Exception):
if self.autosave is True:
log_agent_data(self.to_dict())
logger.error(
f"An error occurred with your swarm {self.name}: Error: {e} Traceback: {e.__traceback__}"
)
return e
def run(
self,
task: str = None,
img: str = None,
*args,
**kwargs,
):
"""
Execute the agent rearrangement task with specified compute resources.
Args:
task (str, optional): The task to execute. Defaults to None.
img (str, optional): Path to input image if required. Defaults to None.
*args: Additional positional arguments passed to _run().
**kwargs: Additional keyword arguments passed to _run().
Returns:
The result from executing the task through the cluster operations wrapper.
"""
try:
log_agent_data(self.to_dict())
out = self._run(
task=task,
img=img,
*args,
**kwargs,
)
log_agent_data(self.to_dict())
return out
except Exception as e:
self._catch_error(e)
def __call__(self, task: str, *args, **kwargs):
"""
Make the class callable by executing the run() method.
Args:
task (str): The task to execute.
*args: Additional positional arguments passed to run().
**kwargs: Additional keyword arguments passed to run().
Returns:
The result from executing run().
"""
try:
return self.run(task=task, *args, **kwargs)
except Exception as e:
logger.error(f"An error occurred: {e}")
return e
def batch_run(
self,
tasks: List[str],
img: Optional[List[str]] = None,
batch_size: int = 10,
*args,
**kwargs,
) -> List[str]:
"""
Process multiple tasks in batches.
Args:
tasks: List of tasks to process
img: Optional list of images corresponding to tasks
batch_size: Number of tasks to process simultaneously
device: Computing device to use
device_id: Specific device ID if applicable
all_cores: Whether to use all CPU cores
all_gpus: Whether to use all available GPUs
Returns:
List of results corresponding to input tasks
"""
try:
results = []
for i in range(0, len(tasks), batch_size):
batch_tasks = tasks[i : i + batch_size]
batch_imgs = (
img[i : i + batch_size]
if img
else [None] * len(batch_tasks)
)
# Process batch using concurrent execution
batch_results = [
self.run(
task=task,
img=img_path,
*args,
**kwargs,
)
for task, img_path in zip(batch_tasks, batch_imgs)
]
results.extend(batch_results)
return results
except Exception as e:
self._catch_error(e)
def concurrent_run(
self,
tasks: List[str],
img: Optional[List[str]] = None,
max_workers: Optional[int] = None,
*args,
**kwargs,
) -> List[str]:
"""
Process multiple tasks concurrently using ThreadPoolExecutor.
Args:
tasks: List of tasks to process
img: Optional list of images corresponding to tasks
max_workers: Maximum number of worker threads
device: Computing device to use
device_id: Specific device ID if applicable
all_cores: Whether to use all CPU cores
all_gpus: Whether to use all available GPUs
Returns:
List of results corresponding to input tasks
"""
try:
with ThreadPoolExecutor(
max_workers=max_workers
) as executor:
imgs = img if img else [None] * len(tasks)
futures = [
executor.submit(
self.run,
task=task,
img=img_path,
*args,
**kwargs,
)
for task, img_path in zip(tasks, imgs)
]
return [future.result() for future in futures]
except Exception as e:
self._catch_error(e)
def _serialize_callable(
self, attr_value: Callable
) -> Dict[str, Any]:
"""
Serializes callable attributes by extracting their name and docstring.
Args:
attr_value (Callable): The callable to serialize.
Returns:
Dict[str, Any]: Dictionary with name and docstring of the callable.
"""
return {
"name": getattr(
attr_value, "__name__", type(attr_value).__name__
),
"doc": getattr(attr_value, "__doc__", None),
}
def _serialize_attr(self, attr_name: str, attr_value: Any) -> Any:
"""
Serializes an individual attribute, handling non-serializable objects.
Args:
attr_name (str): The name of the attribute.
attr_value (Any): The value of the attribute.
Returns:
Any: The serialized value of the attribute.
"""
try:
if callable(attr_value):
return self._serialize_callable(attr_value)
elif hasattr(attr_value, "to_dict"):
return (
attr_value.to_dict()
) # Recursive serialization for nested objects
else:
json.dumps(
attr_value
) # Attempt to serialize to catch non-serializable objects
return attr_value
except (TypeError, ValueError):
return f"<Non-serializable: {type(attr_value).__name__}>"
def to_dict(self) -> Dict[str, Any]:
"""
Converts all attributes of the class, including callables, into a dictionary.
Handles non-serializable attributes by converting them or skipping them.
Returns:
Dict[str, Any]: A dictionary representation of the class attributes.
"""
return {
attr_name: self._serialize_attr(attr_name, attr_value)
for attr_name, attr_value in self.__dict__.items()
}
def rearrange(
name: str = None,
description: str = None,
agents: List[Agent] = None,
flow: str = None,
task: str = None,
img: str = None,
*args,
**kwargs,
):
"""
Rearranges the given list of agents based on the specified flow.
Parameters:
agents (List[Agent]): The list of agents to be rearranged.
flow (str): The flow used for rearranging the agents.
task (str, optional): The task to be performed during rearrangement. Defaults to None.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
The result of running the agent system with the specified task.
Example:
agents = [agent1, agent2, agent3]
flow = "agent1 -> agent2, agent3"
task = "Perform a task"
rearrange(agents, flow, task)
"""
agent_system = AgentRearrange(
name=name,
description=description,
agents=agents,
flow=flow,
*args,
**kwargs,
)
return agent_system.run(task=task, img=img)

@ -1,307 +0,0 @@
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable, List, Optional, Union
from swarms.prompts.multi_agent_collab_prompt import (
MULTI_AGENT_COLLAB_PROMPT,
)
from swarms.structs.agent import Agent
from swarms.structs.agent_rearrange import AgentRearrange
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
logger = initialize_logger(log_folder="sequential_workflow")
class SequentialWorkflow:
"""
Orchestrates the execution of a sequence of agents in a defined workflow.
This class enables the construction and execution of a workflow where multiple agents
(or callables) are executed in a specified order, passing tasks and optional data
through the chain. It supports both synchronous and asynchronous execution, as well as
batched and concurrent task processing.
Attributes:
id (str): Unique identifier for the workflow instance.
name (str): Human-readable name for the workflow.
description (str): Description of the workflow's purpose.
agents (List[Union[Agent, Callable]]): List of agents or callables to execute in sequence.
max_loops (int): Maximum number of times to execute the workflow.
output_type (OutputType): Format of the output from the workflow.
shared_memory_system (callable): Optional callable for managing shared memory between agents.
multi_agent_collab_prompt (bool): Whether to append a collaborative prompt to each agent.
flow (str): String representation of the agent execution order.
agent_rearrange (AgentRearrange): Internal helper for managing agent execution.
Raises:
ValueError: If the agents list is None or empty, or if max_loops is set to 0.
"""
def __init__(
self,
id: str = "sequential_workflow",
name: str = "SequentialWorkflow",
description: str = "Sequential Workflow, where agents are executed in a sequence.",
agents: List[Union[Agent, Callable]] = None,
max_loops: int = 1,
output_type: OutputType = "dict",
shared_memory_system: callable = None,
multi_agent_collab_prompt: bool = True,
team_awareness: bool = False,
streaming_callback: Optional[Callable[[str], None]] = None,
*args,
**kwargs,
):
"""
Initialize a SequentialWorkflow instance.
Args:
id (str, optional): Unique identifier for the workflow. Defaults to "sequential_workflow".
name (str, optional): Name of the workflow. Defaults to "SequentialWorkflow".
description (str, optional): Description of the workflow. Defaults to a standard description.
agents (List[Union[Agent, Callable]], optional): List of agents or callables to execute in sequence.
max_loops (int, optional): Maximum number of times to execute the workflow. Defaults to 1.
output_type (OutputType, optional): Output format for the workflow. Defaults to "dict".
shared_memory_system (callable, optional): Callable for shared memory management. Defaults to None.
multi_agent_collab_prompt (bool, optional): If True, appends a collaborative prompt to each agent.
team_awareness (bool, optional): Whether to enable team awareness. Defaults to False.
streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Raises:
ValueError: If the agents list is None or empty, or if max_loops is set to 0.
"""
self.id = id
self.name = name
self.description = description
self.agents = agents
self.max_loops = max_loops
self.output_type = output_type
self.shared_memory_system = shared_memory_system
self.multi_agent_collab_prompt = multi_agent_collab_prompt
self.team_awareness = team_awareness
self.streaming_callback = streaming_callback
self.reliability_check()
self.flow = self.sequential_flow()
self.agent_rearrange = AgentRearrange(
name=self.name,
description=self.description,
agents=self.agents,
flow=self.flow,
max_loops=self.max_loops,
output_type=self.output_type,
team_awareness=self.team_awareness,
streaming_callback=self.streaming_callback,
*args,
**kwargs,
)
def reliability_check(self):
"""
Validates the workflow configuration and prepares agents for execution.
Raises:
ValueError: If the agents list is None or empty, or if max_loops is set to 0.
"""
if self.agents is None or len(self.agents) == 0:
raise ValueError("Agents list cannot be None or empty")
if self.max_loops == 0:
raise ValueError("max_loops cannot be 0")
if self.multi_agent_collab_prompt is True:
for agent in self.agents:
agent.system_prompt += MULTI_AGENT_COLLAB_PROMPT
logger.info(
f"Sequential Workflow Name: {self.name} is ready to run."
)
def sequential_flow(self):
"""
Constructs a string representation of the agent execution order.
Returns:
str: A string showing the order of agent execution (e.g., "AgentA -> AgentB -> AgentC").
Returns an empty string if no valid agent names are found.
"""
if self.agents:
agent_names = []
for agent in self.agents:
try:
# Try to get agent_name, fallback to name if not available
agent_name = (
getattr(agent, "agent_name", None)
or agent.name
)
agent_names.append(agent_name)
except AttributeError:
logger.warning(
f"Could not get name for agent {agent}"
)
continue
if agent_names:
flow = " -> ".join(agent_names)
else:
flow = ""
logger.warning(
"No valid agent names found to create flow"
)
else:
flow = ""
logger.warning("No agents provided to create flow")
return flow
def run(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
*args,
**kwargs,
):
"""
Executes a specified task through the agents in the dynamically constructed flow.
Args:
task (str): The task for the agents to execute.
img (Optional[str], optional): An optional image input for the agents.
imgs (Optional[List[str]], optional): Optional list of images for the agents.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
str: The final result after processing through all agents.
Raises:
ValueError: If the task is None or empty.
Exception: If any error occurs during task execution.
"""
try:
# prompt = f"{MULTI_AGENT_COLLAB_PROMPT}\n\n{task}"
return self.agent_rearrange.run(
task=task,
img=img,
streaming_callback=self.streaming_callback,
*args,
**kwargs,
)
except Exception as e:
logger.error(
f"An error occurred while executing the task: {e}"
)
raise e
def __call__(self, task: str, *args, **kwargs):
"""
Allows the SequentialWorkflow instance to be called as a function.
Args:
task (str): The task for the agents to execute.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
str: The final result after processing through all agents.
"""
return self.run(task, *args, **kwargs)
def run_batched(self, tasks: List[str]) -> List[str]:
"""
Executes a batch of tasks through the agents in the dynamically constructed flow.
Args:
tasks (List[str]): A list of tasks for the agents to execute.
Returns:
List[str]: A list of final results after processing through all agents.
Raises:
ValueError: If tasks is None, empty, or contains non-string elements.
Exception: If any error occurs during task execution.
"""
if not tasks or not all(
isinstance(task, str) for task in tasks
):
raise ValueError(
"Tasks must be a non-empty list of strings"
)
try:
return [self.agent_rearrange.run(task) for task in tasks]
except Exception as e:
logger.error(
f"An error occurred while executing the batch of tasks: {e}"
)
raise
async def run_async(self, task: str) -> str:
"""
Executes the specified task through the agents in the dynamically constructed flow asynchronously.
Args:
task (str): The task for the agents to execute.
Returns:
str: The final result after processing through all agents.
Raises:
ValueError: If task is None or not a string.
Exception: If any error occurs during task execution.
"""
if not task or not isinstance(task, str):
raise ValueError("Task must be a non-empty string")
try:
return await self.agent_rearrange.run_async(task)
except Exception as e:
logger.error(
f"An error occurred while executing the task asynchronously: {e}"
)
raise
async def run_concurrent(self, tasks: List[str]) -> List[str]:
"""
Executes a batch of tasks through the agents in the dynamically constructed flow concurrently.
Args:
tasks (List[str]): A list of tasks for the agents to execute.
Returns:
List[str]: A list of final results after processing through all agents.
Raises:
ValueError: If tasks is None, empty, or contains non-string elements.
Exception: If any error occurs during task execution.
"""
if not tasks or not all(
isinstance(task, str) for task in tasks
):
raise ValueError(
"Tasks must be a non-empty list of strings"
)
try:
with ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
results = [
executor.submit(self.agent_rearrange.run, task)
for task in tasks
]
return [
result.result()
for result in as_completed(results)
]
except Exception as e:
logger.error(
f"An error occurred while executing the batch of tasks concurrently: {e}"
)
raise
Loading…
Cancel
Save