[IMPROVEMENT][AgentLoader with markdown, csv, and other capaibilties] [fix examples] [swarms.utils.agent_loader -> MarkdownAgentLoader] [util][swarm_id] [Improve SequentialWorkflowPrompting]

pull/966/merge
Kye Gomez 1 week ago
parent bff7a18047
commit 7ddfab0603

@ -1,6 +1,8 @@
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
import orjson
from dotenv import load_dotenv
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
load_dotenv()
swarm = AutoSwarmBuilder(
@ -8,10 +10,11 @@ swarm = AutoSwarmBuilder(
description="My Swarm Description",
verbose=True,
max_loops=1,
return_agents=True,
)
result = swarm.run(
task="Build a swarm to write a research paper on the topic of AI"
)
print(result)
print(orjson.dumps(result, option=orjson.OPT_INDENT_2).decode())

@ -1,6 +1,6 @@
# `AgentRearrange` Class
The `AgentRearrange` class represents a swarm of agents for rearranging tasks. It allows you to create a swarm of agents, add or remove agents from the swarm, and run the swarm to process tasks based on a specified flow pattern.
The `AgentRearrange` class represents a swarm of agents for rearranging tasks. It allows you to create a swarm of agents, add or remove agents from the swarm, and run the swarm to process tasks based on a specified flow pattern. The class now includes **sequential awareness** features that allow agents to know about the agents ahead and behind them in sequential flows.
## Attributes
----------
@ -17,26 +17,38 @@ The `AgentRearrange` class represents a swarm of agents for rearranging tasks. I
| `memory_system` | `BaseVectorDatabase` | Memory system for storing agent interactions |
| `human_in_the_loop` | `bool` | Whether human intervention is enabled |
| `custom_human_in_the_loop` | `Callable` | Custom function for human intervention |
| `return_json` | `bool` | Whether to return output in JSON format |
| `output_type` | `OutputType` | Format of output ("all", "final", "list", or "dict") |
| `docs` | `List[str]` | List of document paths to add to agent prompts |
| `doc_folder` | `str` | Folder path containing documents to add to agent prompts |
| `swarm_history` | `dict` | History of agent interactions |
| `autosave` | `bool` | Whether to automatically save agent data |
| `rules` | `str` | Custom rules to add to the conversation |
| `team_awareness` | `bool` | Whether to enable team awareness and sequential flow information |
| `time_enabled` | `bool` | Whether to enable timestamps in conversation |
| `message_id_on` | `bool` | Whether to enable message IDs in conversation |
## Methods
-------
### `__init__(self, agents: List[Agent] = None, flow: str = None, max_loops: int = 1, verbose: bool = True)`
### `__init__(self, id: str = swarm_id(), name: str = "AgentRearrange", description: str = "A swarm of agents for rearranging tasks.", agents: List[Union[Agent, Callable]] = None, flow: str = None, max_loops: int = 1, verbose: bool = True, memory_system: Any = None, human_in_the_loop: bool = False, custom_human_in_the_loop: Optional[Callable[[str], str]] = None, output_type: OutputType = "all", autosave: bool = True, rules: str = None, team_awareness: bool = False, time_enabled: bool = False, message_id_on: bool = False, *args, **kwargs)`
Initializes the `AgentRearrange` object.
Initializes the `AgentRearrange` object with enhanced sequential awareness capabilities.
| Parameter | Type | Description |
| --- | --- | --- |
| `agents` | `List[Agent]` (optional) | A list of `Agent` objects. Defaults to `None`. |
| `id` | `str` (optional) | Unique identifier for the swarm. Defaults to auto-generated UUID. |
| `name` | `str` (optional) | Name of the swarm. Defaults to "AgentRearrange". |
| `description` | `str` (optional) | Description of the swarm's purpose. Defaults to "A swarm of agents for rearranging tasks.". |
| `agents` | `List[Union[Agent, Callable]]` (optional) | A list of `Agent` objects or callables. Defaults to `None`. |
| `flow` | `str` (optional) | The flow pattern of the tasks. Defaults to `None`. |
| `max_loops` | `int` (optional) | The maximum number of loops for the agents to run. Defaults to `1`. |
| `verbose` | `bool` (optional) | Whether to enable verbose logging or not. Defaults to `True`. |
| `memory_system` | `Any` (optional) | Memory system for storing agent interactions. Defaults to `None`. |
| `human_in_the_loop` | `bool` (optional) | Whether human intervention is enabled. Defaults to `False`. |
| `custom_human_in_the_loop` | `Callable[[str], str]` (optional) | Custom function for human intervention. Defaults to `None`. |
| `output_type` | `OutputType` (optional) | Format of output. Defaults to `"all"`. |
| `autosave` | `bool` (optional) | Whether to automatically save agent data. Defaults to `True`. |
| `rules` | `str` (optional) | Custom rules to add to the conversation. Defaults to `None`. |
| `team_awareness` | `bool` (optional) | Whether to enable team awareness and sequential flow information. Defaults to `False`. |
| `time_enabled` | `bool` (optional) | Whether to enable timestamps in conversation. Defaults to `False`. |
| `message_id_on` | `bool` (optional) | Whether to enable message IDs in conversation. Defaults to `False`. |
### `add_agent(self, agent: Agent)`
@ -74,54 +86,130 @@ Validates the flow pattern.
- `bool`: `True` if the flow pattern is valid.
### `run(self, task: str = None, img: str = None, device: str = "cpu", device_id: int = 1, all_cores: bool = True, all_gpus: bool = False, *args, **kwargs)`
### **Sequential Awareness Methods**
#### `get_agent_sequential_awareness(self, agent_name: str) -> str`
Gets the sequential awareness information for a specific agent, showing which agents come before and after in the sequence.
| Parameter | Type | Description |
| --- | --- | --- |
| `agent_name` | `str` | The name of the agent to get awareness for. |
**Returns:**
- `str`: A string describing the agents ahead and behind in the sequence.
**Example:**
```python
awareness = agent_system.get_agent_sequential_awareness("Agent2")
# Returns: "Sequential awareness: Agent ahead: Agent1 | Agent behind: Agent3"
```
#### `get_sequential_flow_structure(self) -> str`
Gets the overall sequential flow structure information showing the complete workflow with relationships between agents.
**Returns:**
- `str`: A string describing the complete sequential flow structure.
**Example:**
```python
flow_structure = agent_system.get_sequential_flow_structure()
# Returns: "Sequential Flow Structure:
# Step 1: Agent1
# Step 2: Agent2 (follows: Agent1) (leads to: Agent3)
# Step 3: Agent3 (follows: Agent2)"
```
### `run(self, task: str = None, img: str = None, *args, **kwargs)`
Executes the agent rearrangement task with specified compute resources.
| Parameter | Type | Description |
| --- | --- | --- |
| `task` | `str` | The task to execute |
| `img` | `str` | Path to input image if required |
| `device` | `str` | Computing device to use ('cpu' or 'gpu') |
| `device_id` | `int` | ID of specific device to use |
| `all_cores` | `bool` | Whether to use all CPU cores |
| `all_gpus` | `bool` | Whether to use all available GPUs |
| `task` | `str` (optional) | The task to execute. Defaults to `None`. |
| `img` | `str` (optional) | Path to input image if required. Defaults to `None`. |
| `*args` | - | Additional positional arguments passed to `_run()`. |
| `**kwargs` | - | Additional keyword arguments passed to `_run()`. |
**Returns:**
- `str`: The final processed task.
- The result from executing the task through the cluster operations wrapper.
### `batch_run(self, tasks: List[str], img: Optional[List[str]] = None, batch_size: int = 10, device: str = "cpu", device_id: int = None, all_cores: bool = True, all_gpus: bool = False, *args, **kwargs)`
### `batch_run(self, tasks: List[str], img: Optional[List[str]] = None, batch_size: int = 10, *args, **kwargs)`
Process multiple tasks in batches.
| Parameter | Type | Description |
| --- | --- | --- |
| `tasks` | `List[str]` | List of tasks to process |
| `img` | `List[str]` | Optional list of images corresponding to tasks |
| `img` | `List[str]` (optional) | Optional list of images corresponding to tasks |
| `batch_size` | `int` | Number of tasks to process simultaneously |
| `device` | `str` | Computing device to use |
| `device_id` | `int` | Specific device ID if applicable |
| `all_cores` | `bool` | Whether to use all CPU cores |
| `all_gpus` | `bool` | Whether to use all available GPUs |
| `*args` | - | Additional positional arguments |
| `**kwargs` | - | Additional keyword arguments |
**Returns:**
- `List[str]`: List of results corresponding to input tasks
### `concurrent_run(self, tasks: List[str], img: Optional[List[str]] = None, max_workers: Optional[int] = None, device: str = "cpu", device_id: int = None, all_cores: bool = True, all_gpus: bool = False, *args, **kwargs)`
### `concurrent_run(self, tasks: List[str], img: Optional[List[str]] = None, max_workers: Optional[int] = None, *args, **kwargs)`
Process multiple tasks concurrently using ThreadPoolExecutor.
| Parameter | Type | Description |
| --- | --- | --- |
| `tasks` | `List[str]` | List of tasks to process |
| `img` | `List[str]` | Optional list of images corresponding to tasks |
| `max_workers` | `int` | Maximum number of worker threads |
| `device` | `str` | Computing device to use |
| `device_id` | `int` | Specific device ID if applicable |
| `all_cores` | `bool` | Whether to use all CPU cores |
| `all_gpus` | `bool` | Whether to use all available GPUs |
| `img` | `List[str]` (optional) | Optional list of images corresponding to tasks |
| `max_workers` | `int` (optional) | Maximum number of worker threads |
| `*args` | - | Additional positional arguments |
| `**kwargs` | - | Additional keyword arguments |
**Returns:**
- `List[str]`: List of results corresponding to input tasks
## **Sequential Awareness Feature**
The `AgentRearrange` class now includes a powerful **sequential awareness** feature that enhances agent collaboration in sequential workflows. When agents are executed sequentially, they automatically receive information about:
- **Agent ahead**: The agent that completed their task before them
- **Agent behind**: The agent that will receive their output next
This feature is automatically enabled when using sequential flows and provides agents with context about their position in the workflow, improving coordination and task understanding.
### How It Works
1. **Automatic Detection**: The system automatically detects when agents are running sequentially vs. in parallel
2. **Context Injection**: Before each sequential agent runs, awareness information is added to the conversation
3. **Enhanced Collaboration**: Agents can reference previous agents' work and prepare output for the next agent
### Example with Sequential Awareness
```python
from swarms import Agent, AgentRearrange
# Create agents
agent1 = Agent(agent_name="Researcher", system_prompt="Research the topic")
agent2 = Agent(agent_name="Writer", system_prompt="Write based on research")
agent3 = Agent(agent_name="Editor", system_prompt="Edit the written content")
# Create sequential workflow
workflow = AgentRearrange(
agents=[agent1, agent2, agent3],
flow="Researcher -> Writer -> Editor",
team_awareness=True # Enables sequential awareness
)
# Run the workflow
result = workflow.run("Research and write about artificial intelligence")
```
**What happens automatically:**
- **Researcher** runs first (no awareness info needed)
- **Writer** receives: "Sequential awareness: Agent ahead: Researcher | Agent behind: Editor"
- **Editor** receives: "Sequential awareness: Agent ahead: Writer"
## Documentation for `rearrange` Function
======================================
@ -133,9 +221,12 @@ The `rearrange` function is a helper function that rearranges the given list of
| Parameter | Type | Description |
| --- | --- | --- |
| `name` | `str` (optional) | Name for the agent system. Defaults to `None`. |
| `description` | `str` (optional) | Description for the agent system. Defaults to `None`. |
| `agents` | `List[Agent]` | The list of agents to be rearranged. |
| `flow` | `str` | The flow used for rearranging the agents. |
| `task` | `str` (optional) | The task to be performed during rearrangement. Defaults to `None`. |
| `img` | `str` (optional) | Path to input image if required. Defaults to `None`. |
| `*args` | - | Additional positional arguments. |
| `**kwargs` | - | Additional keyword arguments. |
@ -157,7 +248,7 @@ rearrange(agents, flow, task)
### Example Usage
-------------
Here's an example of how to use the `AgentRearrange` class and the `rearrange` function:
Here's an example of how to use the `AgentRearrange` class and the `rearrange` function with the new sequential awareness features:
```python
from swarms import Agent, AgentRearrange
@ -211,20 +302,35 @@ agents = [director, worker1, worker2]
# Define the flow pattern
flow = "Accounting Director -> Accountant 1 -> Accountant 2"
# Using AgentRearrange class
agent_system = AgentRearrange(agents=agents, flow=flow)
# Using AgentRearrange class with sequential awareness
agent_system = AgentRearrange(
agents=agents,
flow=flow,
team_awareness=True, # Enables sequential awareness
time_enabled=True, # Enable timestamps
message_id_on=True # Enable message IDs
)
# Get sequential flow information
flow_structure = agent_system.get_sequential_flow_structure()
print("Flow Structure:", flow_structure)
# Get awareness for specific agents
worker1_awareness = agent_system.get_agent_sequential_awareness("Accountant 1")
print("Worker1 Awareness:", worker1_awareness)
# Run the workflow
output = agent_system.run("Process monthly financial statements")
print(output)
```
In this example, we first initialize three agents: `director`, `worker1`, and `worker2`. Then, we create a list of these agents and define the flow pattern `"Director -> Worker1 -> Worker2"`.
We can use the `AgentRearrange` class by creating an instance of it with the list of agents and the flow pattern. We then call the `run` method with the initial task, and it will execute the agents in the specified order, passing the output of one agent as the input to the next agent.
Alternatively, we can use the `rearrange` function by passing the list of agents, the flow pattern, and the initial task as arguments.
Both the `AgentRearrange` class and the `rearrange` function will return the final output after processing the task through the agents according to the specified flow pattern.
The new sequential awareness features provide:
- **Automatic context**: Each agent knows who came before and who comes after
- **Better coordination**: Agents can reference previous work and prepare for next steps
- **Flow visualization**: You can see the complete workflow structure
- **Enhanced logging**: Better tracking of agent interactions
## Error Handling
--------------
@ -242,63 +348,69 @@ output = agent_system.run("Some task")`
This will raise a `ValueError` with the message `"Agent 'Worker3' is not registered."`.
## Parallel and Sequential Processing
----------------------------------
The `AgentRearrange` class supports both parallel and sequential processing of tasks based on the specified flow pattern. If the flow pattern includes multiple agents separated by commas (e.g., `"agent1, agent2"`), the agents will be executed in parallel, and their outputs will be concatenated with a semicolon (`;`). If the flow pattern includes a single agent, it will be executed sequentially.
The `AgentRearrange` class supports both parallel and sequential processing of tasks based on the specified flow pattern. If the flow pattern includes multiple agents separated by commas (e.g., `"agent1, agent2"`), the agents will be executed in parallel, and their outputs will be concatenated. If the flow pattern includes a single agent, it will be executed sequentially with enhanced awareness.
### Parallel processing
`parallel_flow = "Worker1, Worker2 -> Director"`
### Sequential processing
### Sequential processing with awareness
`sequential_flow = "Worker1 -> Worker2 -> Director"`
In the `parallel_flow` example, `Worker1` and `Worker2` will be executed in parallel, and their outputs will be concatenated and passed to `Director`. In the `sequential_flow` example, `Worker1` will be executed first, and its output will be passed to `Worker2`, and then the output of `Worker2` will be passed to `Director`.
In the `parallel_flow` example, `Worker1` and `Worker2` will be executed in parallel, and their outputs will be concatenated and passed to `Director`.
## Logging
-------
In the `sequential_flow` example, `Worker1` will be executed first, then `Worker2` will receive awareness that `Worker1` came before and `Director` comes after, and finally `Director` will receive awareness that `Worker2` came before.
The `AgentRearrange` class includes logging capabilities using the `loguru` library. If `verbose` is set to `True` during initialization, a log file named `agent_rearrange.log` will be created, and log messages will be written to it. You can use this log file to track the execution of the agents and any potential issues or errors that may occur.
## Logging and Monitoring
-------
The `AgentRearrange` class includes comprehensive logging capabilities using the `loguru` library. The new sequential awareness features add enhanced logging:
```bash
2023-05-08 10:30:15.456 | INFO | agent_rearrange:__init__:34 - Adding agent Director to the swarm.
2023-05-08 10:30:15.457 | INFO | agent_rearrange:__init__:34 - Adding agent Worker1 to the swarm.
2023-05-08 10:30:15.457 | INFO | agent_rearrange:__init__:34 - Adding agent Worker2 to the swarm.
2023-05-08 10:30:15.458 | INFO | agent_rearrange:run:118 - Running agents in parallel: ['Worker1', 'Worker2']
2023-05-08 10:30:15.459 | INFO | agent_rearrange:run:121 - Running agents sequentially: ['Director']`
2023-05-08 10:30:15.459 | INFO | agent_rearrange:run:121 - Running agents sequentially: ['Director']
2023-05-08 10:30:15.460 | INFO | agent_rearrange:run:125 - Added sequential awareness for Worker2: Sequential awareness: Agent ahead: Worker1 | Agent behind: Director
```
## Additional Parameters
---------------------
The `AgentRearrange` class also accepts additional parameters that can be passed to the `run` method using `*args` and `**kwargs`. These parameters will be forwarded to the individual agents during execution.
`agent_system = AgentRearrange(agents=agents, flow=flow)`
`output = agent_system.run("Some task", max_tokens=200, temperature=0.7)`
The `AgentRearrange` class now accepts additional parameters for enhanced functionality:
In this example, the `max_tokens` and `temperature` parameters will be passed to each agent during execution.
```python
agent_system = AgentRearrange(
agents=agents,
flow=flow,
team_awareness=True, # Enable sequential awareness
time_enabled=True, # Enable conversation timestamps
message_id_on=True, # Enable message IDs
verbose=True # Enable detailed logging
)
```
## Customization
-------------
The `AgentRearrange` class and the `rearrange` function can be customized and extended to suit specific use cases. For example, you can create custom agents by inheriting from the `Agent` class and implementing custom logic for task processing. You can then add these custom agents to the swarm and define the flow pattern accordingly.
Additionally, you can modify the `run` method of the `AgentRearrange` class to implement custom logic for task processing and agent interaction.
The `AgentRearrange` class and the `rearrange` function can be customized and extended to suit specific use cases. The new sequential awareness features provide a foundation for building more sophisticated agent coordination systems.
## Limitations
-----------
It's important to note that the `AgentRearrange` class and the `rearrange` function rely on the individual agents to process tasks correctly. The quality of the output will depend on the capabilities and configurations of the agents used in the swarm. Additionally, the `AgentRearrange` class does not provide any mechanisms for task prioritization or load balancing among the agents.
It's important to note that the `AgentRearrange` class and the `rearrange` function rely on the individual agents to process tasks correctly. The quality of the output will depend on the capabilities and configurations of the agents used in the swarm.
The sequential awareness feature works best with agents that can understand and utilize context about their position in the workflow.
## Conclusion
----------
The `AgentRearrange` class and the `rearrange` function provide a flexible and extensible framework for orchestrating swarms of agents to process tasks based on a specified flow pattern. By combining the capabilities of individual agents, you can create complex workflows and leverage the strengths of different agents to tackle various tasks efficiently.
The `AgentRearrange` class and the `rearrange` function provide a flexible and extensible framework for orchestrating swarms of agents to process tasks based on a specified flow pattern. The new **sequential awareness** features significantly enhance agent collaboration by providing context about workflow relationships.
By combining the capabilities of individual agents with enhanced awareness of their position in the workflow, you can create more intelligent and coordinated multi-agent systems that understand not just their individual tasks, but also their role in the larger workflow.
While the current implementation offers basic functionality for agent rearrangement, there is room for future improvements and customizations to enhance the system's capabilities and cater to more specific use cases.
Whether you're working on natural language processing tasks, data analysis, or any other domain where agent-based systems can be beneficial, the enhanced `AgentRearrange` class provides a solid foundation for building sophisticated swarm-based solutions with improved coordination and context awareness.
Whether you're working on natural language processing tasks, data analysis, or any other domain where agent-based systems can be beneficial, the `AgentRearrange` class and the `rearrange` function provide a solid foundation for building and experimenting with swarm-based solutions.

@ -1,44 +1,77 @@
# SequentialWorkflow Documentation
**Overview:**
A Sequential Swarm architecture processes tasks in a linear sequence. Each agent completes its task before passing the result to the next agent in the chain. This architecture ensures orderly processing and is useful when tasks have dependencies. [Learn more here in the docs:](https://docs.swarms.world/en/latest/swarms/structs/agent_rearrange/)
A Sequential Swarm architecture processes tasks in a linear sequence. Each agent completes its task before passing the result to the next agent in the chain. This architecture ensures orderly processing and is useful when tasks have dependencies. The system now includes **sequential awareness** features that allow agents to know about the agents ahead and behind them in the workflow, significantly enhancing coordination and context understanding. [Learn more here in the docs:](https://docs.swarms.world/en/latest/swarms/structs/agent_rearrange/)
**Use-Cases:**
- Workflows where each step depends on the previous one, such as assembly lines or sequential data processing.
- Scenarios requiring strict order of operations.
- **NEW**: Enhanced workflows where agents need context about their position in the sequence for better coordination.
```mermaid
graph TD
A[First Agent] --> B[Second Agent]
B --> C[Third Agent]
C --> D[Fourth Agent]
style A fill:#e1f5fe
style B fill:#f3e5f5
style C fill:#e8f5e8
style D fill:#fff3e0
A -.->|"Awareness: None (first)"| A
B -.->|"Awareness: Ahead: A, Behind: C"| B
C -.->|"Awareness: Ahead: B, Behind: D"| C
D -.->|"Awareness: Ahead: C, Behind: None (last)"| D
```
## **Sequential Awareness Feature**
The SequentialWorkflow now includes a powerful **sequential awareness** feature that automatically provides each agent with context about their position in the workflow:
### What Agents Know Automatically
- **Agent ahead**: The agent that completed their task before them
- **Agent behind**: The agent that will receive their output next
- **Workflow position**: Their step number and role in the sequence
### Benefits
1. **Better Coordination**: Agents can reference previous work and prepare output for the next step
2. **Context Understanding**: Each agent knows their role in the larger workflow
3. **Improved Quality**: Output is tailored for the next agent in the sequence
4. **Enhanced Logging**: Better tracking of agent interactions and workflow progress
## Attributes
| Attribute | Type | Description |
|------------------|---------------|--------------------------------------------------|
| `agents` | `List[Agent]` | The list of agents in the workflow. |
| `flow` | `str` | A string representing the order of agents. |
| `agent_rearrange`| `AgentRearrange` | Manages the dynamic execution of agents. |
| `agent_rearrange`| `AgentRearrange` | Manages the dynamic execution of agents with sequential awareness. |
| `team_awareness` | `bool` | **NEW**: Enables sequential awareness features. Defaults to `False`. |
| `time_enabled` | `bool` | **NEW**: Enables timestamps in conversation. Defaults to `False`. |
| `message_id_on` | `bool` | **NEW**: Enables message IDs in conversation. Defaults to `False`. |
## Methods
### `__init__(self, agents: List[Agent] = None, max_loops: int = 1, *args, **kwargs)`
### `__init__(self, agents: List[Agent] = None, max_loops: int = 1, team_awareness: bool = False, time_enabled: bool = False, message_id_on: bool = False, *args, **kwargs)`
The constructor initializes the `SequentialWorkflow` object.
The constructor initializes the `SequentialWorkflow` object with enhanced sequential awareness capabilities.
- **Parameters:**
- `agents` (`List[Agent]`, optional): The list of agents in the workflow. Defaults to `None`.
- `max_loops` (`int`, optional): The maximum number of loops to execute the workflow. Defaults to `1`.
- `team_awareness` (`bool`, optional): **NEW**: Enables sequential awareness features. Defaults to `False`.
- `time_enabled` (`bool`, optional): **NEW**: Enables timestamps in conversation. Defaults to `False`.
- `message_id_on` (`bool`, optional): **NEW**: Enables message IDs in conversation. Defaults to `False`.
- `*args`: Variable length argument list.
- `**kwargs`: Arbitrary keyword arguments.
### `run(self, task: str) -> str`
Runs the specified task through the agents in the dynamically constructed flow.
Runs the specified task through the agents in the dynamically constructed flow with enhanced sequential awareness.
- **Parameters:**
- `task` (`str`): The task for the agents to execute.
@ -46,10 +79,28 @@ Runs the specified task through the agents in the dynamically constructed flow.
- **Returns:**
- `str`: The final result after processing through all agents.
## **Usage Example:**
### **NEW: Sequential Awareness Methods**
```python
#### `get_agent_sequential_awareness(self, agent_name: str) -> str`
Gets the sequential awareness information for a specific agent, showing which agents come before and after in the sequence.
- **Parameters:**
- `agent_name` (`str`): The name of the agent to get awareness for.
- **Returns:**
- `str`: A string describing the agents ahead and behind in the sequence.
#### `get_sequential_flow_structure(self) -> str`
Gets the overall sequential flow structure information showing the complete workflow with relationships between agents.
- **Returns:**
- `str`: A string describing the complete sequential flow structure.
## **Usage Example with Sequential Awareness:**
```python
from swarms import Agent, SequentialWorkflow
# Initialize agents for individual tasks
@ -65,33 +116,168 @@ agent2 = Agent(
model_name="gpt-4o",
max_loops=1,
)
agent3 = Agent(
agent_name="ICD-10 Code Validator",
system_prompt="Validate and finalize the ICD-10 code recommendations.",
model_name="gpt-4o",
max_loops=1,
)
# Create the Sequential workflow
# Create the Sequential workflow with enhanced awareness
workflow = SequentialWorkflow(
agents=[agent1, agent2], max_loops=1, verbose=False
agents=[agent1, agent2, agent3],
max_loops=1,
verbose=False,
team_awareness=True, # Enable sequential awareness
time_enabled=True, # Enable timestamps
message_id_on=True # Enable message IDs
)
# Get workflow structure information
flow_structure = workflow.get_sequential_flow_structure()
print("Workflow Structure:")
print(flow_structure)
# Get awareness for specific agents
analyzer_awareness = workflow.get_agent_sequential_awareness("ICD-10 Code Analyzer")
summarizer_awareness = workflow.get_agent_sequential_awareness("ICD-10 Code Summarizer")
validator_awareness = workflow.get_agent_sequential_awareness("ICD-10 Code Validator")
print(f"\nAnalyzer Awareness: {analyzer_awareness}")
print(f"Summarizer Awareness: {summarizer_awareness}")
print(f"Validator Awareness: {validator_awareness}")
# Run the workflow
workflow.run(
result = workflow.run(
"Analyze the medical report and provide the appropriate ICD-10 codes."
)
print(f"\nFinal Result: {result}")
```
**Expected Output:**
```
Workflow Structure:
Sequential Flow Structure:
Step 1: ICD-10 Code Analyzer
Step 2: ICD-10 Code Summarizer (follows: ICD-10 Code Analyzer) (leads to: ICD-10 Code Validator)
Step 3: ICD-10 Code Validator (follows: ICD-10 Code Summarizer)
Analyzer Awareness:
Summarizer Awareness: Sequential awareness: Agent ahead: ICD-10 Code Analyzer | Agent behind: ICD-10 Code Validator
Validator Awareness: Sequential awareness: Agent ahead: ICD-10 Code Summarizer
```
## **How Sequential Awareness Works**
### 1. **Automatic Context Injection**
When `team_awareness=True`, the system automatically adds awareness information to each agent's conversation context before they run:
This example initializes a `SequentialWorkflow` with three agents and executes a task, printing the final result.
- **First Agent**: No awareness info (starts the workflow)
- **Middle Agents**: Receive info about both the agent ahead and behind
- **Last Agent**: Receives info about the agent ahead only
### 2. **Enhanced Agent Prompts**
Each agent receives context like:
```
Sequential awareness: Agent ahead: ICD-10 Code Analyzer | Agent behind: ICD-10 Code Validator
```
### 3. **Improved Coordination**
Agents can now:
- Reference previous work more effectively
- Prepare output specifically for the next agent
- Understand their role in the larger workflow
- Provide better context for subsequent steps
## **Advanced Usage Examples**
### **Example 1: Research → Analysis → Report Workflow**
```python
# Create specialized agents
researcher = Agent(
agent_name="Researcher",
system_prompt="Conduct thorough research on the given topic."
)
analyzer = Agent(
agent_name="Data Analyzer",
system_prompt="Analyze research data and identify key insights."
)
reporter = Agent(
agent_name="Report Writer",
system_prompt="Write comprehensive reports based on analysis."
)
# Create workflow with awareness
workflow = SequentialWorkflow(
agents=[researcher, analyzer, reporter],
team_awareness=True,
time_enabled=True
)
# Run with enhanced coordination
result = workflow.run("Research and analyze the impact of AI on healthcare")
```
### **Example 2: Code Review Workflow**
```python
# Create code review agents
linter = Agent(
agent_name="Code Linter",
system_prompt="Check code for syntax errors and style violations."
)
reviewer = Agent(
agent_name="Code Reviewer",
system_prompt="Review code quality and suggest improvements."
)
tester = Agent(
agent_name="Code Tester",
system_prompt="Write and run tests for the reviewed code."
)
# Create workflow
workflow = SequentialWorkflow(
agents=[linter, reviewer, tester],
team_awareness=True
)
# Run code review process
result = workflow.run("Review and test the authentication module")
```
## **Notes:**
- Logs the task execution process and handles any exceptions that occur during the task execution.
- **Enhanced Logging**: The workflow now logs sequential awareness information for better debugging and monitoring.
- **Automatic Context**: No manual configuration needed - awareness is automatically provided when `team_awareness=True`.
- **Backward Compatibility**: Existing workflows continue to work without changes.
- **Performance**: Sequential awareness adds minimal overhead while significantly improving coordination.
### Logging and Error Handling
The `run` method includes logging to track the execution flow and captures errors to provide detailed information in case of failures. This is crucial for debugging and ensuring smooth operation of the workflow.
The `run` method now includes enhanced logging to track the sequential awareness flow and captures detailed information about agent interactions:
```bash
2023-05-08 10:30:15.456 | INFO | SequentialWorkflow:run:45 - Starting sequential workflow execution
2023-05-08 10:30:15.457 | INFO | SequentialWorkflow:run:52 - Added sequential awareness for ICD-10 Code Summarizer: Sequential awareness: Agent ahead: ICD-10 Code Analyzer | Agent behind: ICD-10 Code Validator
2023-05-08 10:30:15.458 | INFO | SequentialWorkflow:run:52 - Added sequential awareness for ICD-10 Code Validator: Sequential awareness: Agent ahead: ICD-10 Code Summarizer
```
## Additional Tips
- Ensure that the agents provided to the `SequentialWorkflow` are properly initialized and configured to handle the tasks they will receive.
- **Enable Team Awareness**: Set `team_awareness=True` to unlock the full potential of sequential coordination.
- **Use Descriptive Agent Names**: Clear agent names make the awareness information more useful.
- **Monitor Logs**: Enhanced logging provides insights into how agents are coordinating.
- **Iterative Improvement**: Use the awareness features to refine agent prompts and improve workflow quality.
## **Benefits of Sequential Awareness**
- The `max_loops` parameter can be used to control how many times the workflow should be executed, which is useful for iterative processes.
1. **Improved Quality**: Agents produce better output when they understand their context
2. **Better Coordination**: Reduced redundancy and improved handoffs between agents
3. **Enhanced Debugging**: Clear visibility into agent interactions and workflow progress
4. **Scalable Workflows**: Easy to add new agents while maintaining coordination
5. **Professional Workflows**: Mimics real-world team collaboration patterns
- Utilize the logging information to monitor and debug the task execution process.
The SequentialWorkflow with sequential awareness represents a significant advancement in multi-agent coordination, enabling more sophisticated and professional workflows that closely mirror human team collaboration patterns.

@ -1,66 +1,114 @@
# AgentLoader Documentation
The `AgentLoader` is a powerful utility for creating Swarms agents from markdown files using the Claude Code sub-agent format. It supports both single and multiple markdown file loading, providing a flexible way to define and deploy agents using YAML frontmatter configuration.
The `AgentLoader` is a comprehensive utility for creating Swarms agents from various file formats including Markdown, YAML, and CSV files. It provides a unified interface for loading agents with support for concurrent processing, configuration overrides, and automatic file type detection.
## Overview
The AgentLoader enables you to:
- Load single agents from markdown files with YAML frontmatter
- Load multiple agents from directories or file lists with concurrent processing
- Parse Claude Code sub-agent YAML frontmatter configurations
- Extract system prompts from markdown content
- Utilize 100% CPU cores for high-performance batch loading
- Provide comprehensive error handling and validation
- Load agents from Markdown files
- Load agents from YAML configuration files
- Load agents from CSV files
- Automatically detect file types and use appropriate loaders
- Process multiple files concurrently for improved performance
- Override default configurations with custom parameters
- Handle various agent configurations and settings
## Installation
The AgentLoader is included with the Swarms framework:
```python
from swarms import AgentLoader, load_agent_from_markdown, load_agents_from_markdown
from swarms.structs import AgentLoader
from swarms.utils import load_agent_from_markdown, load_agents_from_markdown
```
## Markdown Format
## Supported File Formats
The AgentLoader uses the Claude Code sub-agent YAML frontmatter format:
### 1. Markdown Files (Claude Code Format)
The primary format uses YAML frontmatter with markdown content:
```markdown
---
name: your-sub-agent-name
description: Description of when this subagent should be invoked
model_name: gpt-4
temperature: 0.3
max_loops: 2
name: FinanceAdvisor
description: Expert financial advisor for investment and budgeting guidance
model_name: claude-sonnet-4-20250514
temperature: 0.7
max_loops: 1
mcp_url: http://example.com/mcp # optional
---
Your subagent's system prompt goes here. This can be multiple paragraphs
and should clearly define the subagent's role, capabilities, and approach
to solving problems.
You are an expert financial advisor with deep knowledge in:
- Investment strategies and portfolio management
- Personal budgeting and financial planning
- Risk assessment and diversification
- Tax optimization strategies
- Retirement planning
Your approach:
- Provide clear, actionable financial advice
- Consider individual risk tolerance and goals
- Explain complex concepts in simple terms
- Always emphasize the importance of diversification
- Include relevant disclaimers about financial advice
Include specific instructions, best practices, and any constraints
the subagent should follow.
When analyzing financial situations:
1. Assess current financial position
2. Identify short-term and long-term goals
3. Evaluate risk tolerance
4. Recommend appropriate strategies
5. Suggest specific action steps
```
**Schema Fields:**
- `name` (required): Your sub-agent name
- `description` (required): Description of when this subagent should be invoked
- `model_name` (optional): Name of model (defaults to random selection if not provided)
- `temperature` (optional): Float value for model temperature (0.0-2.0)
- `max_loops` (optional): Integer for maximum reasoning loops
- `mcp_url` (optional): MCP server URL if needed
| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `name` | string | ✅ Yes | - | Your agent name |
| `description` | string | ✅ Yes | - | Description of the agent's role and capabilities |
| `model_name` | string | ❌ No | "gpt-4.1" | Name of the model to use |
| `temperature` | float | ❌ No | 0.1 | Model temperature (0.0-2.0) |
| `max_loops` | integer | ❌ No | 1 | Maximum reasoning loops |
| `mcp_url` | string | ❌ No | None | MCP server URL if needed |
| `streaming_on` | boolean | ❌ No | False | Enable streaming output |
### 2. YAML Files
YAML configuration files for agent definitions:
```yaml
agents:
- name: "ResearchAgent"
description: "Research and analysis specialist"
model_name: "gpt-4"
temperature: 0.3
max_loops: 2
system_prompt: "You are a research specialist..."
```
### 3. CSV Files
CSV files with agent configurations:
```csv
name,description,model_name,temperature,max_loops
ResearchAgent,Research specialist,gpt-4,0.3,2
AnalysisAgent,Data analyst,claude-3,0.1,1
```
## Quick Start
### Loading a Single Agent
```python
from swarms.utils import load_agent_from_markdown
from swarms.structs import AgentLoader
# Initialize the loader
loader = AgentLoader()
# Load agent from markdown file
agent = load_agent_from_markdown("finance_advisor.md")
agent = loader.load_agent_from_markdown("finance_advisor.md")
# Use the agent
response = agent.run(
@ -68,13 +116,15 @@ response = agent.run(
)
```
### Loading Multiple Agents (Concurrent)
### Loading Multiple Agents
```python
from swarms.utils import load_agents_from_markdown
from swarms.structs import AgentLoader
loader = AgentLoader()
# Load agents from list of files with concurrent processing
agents = load_agents_from_markdown([
agents = loader.load_agents_from_markdown([
"market_researcher.md",
"financial_analyst.md",
"risk_analyst.md"
@ -92,6 +142,19 @@ task = "Analyze the AI healthcare market for a $50M investment."
result = workflow.run(task)
```
### Automatic File Type Detection
```python
from swarms.structs import AgentLoader
loader = AgentLoader()
# Automatically detect file type and load appropriately
agents = loader.auto("agents.yaml") # YAML file
agents = loader.auto("agents.csv") # CSV file
agents = loader.auto("agents.md") # Markdown file
```
## Class-Based Usage
### AgentLoader Class
@ -99,7 +162,7 @@ result = workflow.run(task)
For more advanced usage, use the `AgentLoader` class directly:
```python
from swarms import AgentLoader
from swarms.structs import AgentLoader
# Initialize loader
loader = AgentLoader()
@ -111,7 +174,7 @@ agent = loader.load_single_agent("path/to/agent.md")
agents = loader.load_multiple_agents(
"./agents_directory/",
concurrent=True, # Enable concurrent processing
max_workers=8 # Optional: limit worker threads
max_file_size_mb=10.0 # Limit file size for memory safety
)
# Parse markdown file without creating agent
@ -124,35 +187,97 @@ print(config.name, config.description)
You can override default configuration when loading agents:
```python
agent = load_agent_from_markdown(
agent = loader.load_agent_from_markdown(
file_path="agent.md",
max_loops=5,
verbose=True,
dashboard=True,
autosave=False,
context_length=200000
context_length=200000,
temperature=0.5
)
```
### Available Configuration Parameters
- `max_loops` (int): Maximum number of reasoning loops (default: 1)
- `autosave` (bool): Enable automatic state saving (default: True)
- `dashboard` (bool): Enable dashboard monitoring (default: False)
- `verbose` (bool): Enable verbose logging (default: False)
- `dynamic_temperature_enabled` (bool): Enable dynamic temperature (default: False)
- `saved_state_path` (str): Path for saving agent state
- `user_name` (str): User identifier (default: "default_user")
- `retry_attempts` (int): Number of retry attempts (default: 3)
- `context_length` (int): Maximum context length (default: 100000)
- `return_step_meta` (bool): Return step metadata (default: False)
- `output_type` (str): Output format type (default: "str")
- `auto_generate_prompt` (bool): Auto-generate prompts (default: False)
- `artifacts_on` (bool): Enable artifacts (default: False)
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `max_loops` | int | 1 | Maximum number of reasoning loops |
| `autosave` | bool | False | Enable automatic state saving |
| `dashboard` | bool | False | Enable dashboard monitoring |
| `verbose` | bool | False | Enable verbose logging |
| `dynamic_temperature_enabled` | bool | False | Enable dynamic temperature |
| `saved_state_path` | str | None | Path for saving agent state |
| `user_name` | str | "default_user" | User identifier |
| `retry_attempts` | int | 3 | Number of retry attempts |
| `context_length` | int | 100000 | Maximum context length |
| `return_step_meta` | bool | False | Return step metadata |
| `output_type` | str | "str" | Output format type |
| `auto_generate_prompt` | bool | False | Auto-generate prompts |
| `streaming_on` | bool | False | Enable streaming output |
| `mcp_url` | str | None | MCP server URL if needed |
## Advanced Features
## Complete Example
### Concurrent Processing
### Example: Finance Advisor Agent
The AgentLoader utilizes multiple CPU cores for concurrent agent loading:
```python
from swarms.structs import AgentLoader
loader = AgentLoader()
# Automatic concurrent processing for multiple files
agents = loader.load_agents_from_markdown([
"agent1.md", "agent2.md", "agent3.md", "agent4.md"
]) # concurrent=True by default
# Manual control over concurrency
agents = loader.load_agents_from_markdown(
"./agents_directory/",
concurrent=True, # Enable concurrent processing
max_file_size_mb=5.0 # Limit file size for memory safety
)
# Disable concurrency for debugging or single files
agents = loader.load_agents_from_markdown(
["single_agent.md"],
concurrent=False # Sequential processing
)
```
### File Size Validation
```python
# Set maximum file size to prevent memory issues
agents = loader.load_agents_from_markdown(
"./agents_directory/",
max_file_size_mb=5.0 # Skip files larger than 5MB
)
```
### Multiple File Type Support
```python
from swarms.structs import AgentLoader
loader = AgentLoader()
# Load from different file types
yaml_agents = loader.load_agents_from_yaml("agents.yaml")
csv_agents = loader.load_agents_from_csv("agents.csv")
md_agents = loader.load_agents_from_markdown("agents.md")
# Load from multiple YAML files with different return types
yaml_files = ["agents1.yaml", "agents2.yaml"]
return_types = ["auto", "list"]
agents = loader.load_many_agents_from_yaml(yaml_files, return_types)
```
## Complete Examples
### Example 1: Finance Advisor Agent
Create a file `finance_advisor.md`:
@ -160,7 +285,7 @@ Create a file `finance_advisor.md`:
---
name: FinanceAdvisor
description: Expert financial advisor for investment and budgeting guidance
model_name: gpt-4
model_name: claude-sonnet-4-20250514
temperature: 0.7
max_loops: 1
---
@ -193,10 +318,11 @@ When analyzing financial situations:
### Loading and Using the Agent
```python
from swarms.utils import load_agent_from_markdown
from swarms.structs import AgentLoader
# Load the Finance Advisor agent
agent = load_agent_from_markdown("finance_advisor.md")
loader = AgentLoader()
agent = loader.load_agent_from_markdown("finance_advisor.md")
# Use the agent for financial advice
response = agent.run(
@ -204,128 +330,85 @@ response = agent.run(
)
```
## Error Handling
The AgentLoader provides comprehensive error handling:
### Example 2: Multi-Agent Workflow
```python
from swarms import AgentLoader
from swarms.structs import AgentLoader, SequentialWorkflow
# Load multiple specialized agents
loader = AgentLoader()
agents = loader.load_agents_from_markdown([
"market_researcher.md",
"financial_analyst.md",
"risk_analyst.md"
], concurrent=True)
try:
# This will raise FileNotFoundError
agent = loader.load_single_agent("nonexistent.md")
except FileNotFoundError as e:
print(f"File not found: {e}")
try:
# This will handle parsing errors gracefully
agents = loader.load_multiple_agents("./invalid_directory/")
print(f"Successfully loaded {len(agents)} agents")
except Exception as e:
print(f"Error loading agents: {e}")
```
## Concurrent Processing Features
### Multi-Core Performance
The AgentLoader utilizes 100% of CPU cores for concurrent agent loading, providing significant performance improvements when processing multiple markdown files:
```python
from swarms.utils import load_agents_from_markdown
# Automatic concurrent processing for multiple files
agents = load_agents_from_markdown([
"agent1.md", "agent2.md", "agent3.md", "agent4.md"
]) # concurrent=True by default
# Manual control over concurrency
agents = load_agents_from_markdown(
"./agents_directory/",
concurrent=True, # Enable concurrent processing
max_workers=8 # Limit to 8 worker threads
)
# Disable concurrency for debugging or single files
agents = load_agents_from_markdown(
["single_agent.md"],
concurrent=False # Sequential processing
# Create a sequential workflow
workflow = SequentialWorkflow(
agents=agents,
max_loops=1
)
```
### Resource Management
```python
# Default: Uses all CPU cores
agents = load_agents_from_markdown(files, concurrent=True)
# Custom worker count for resource control
agents = load_agents_from_markdown(
files,
concurrent=True,
max_workers=4 # Limit to 4 threads
)
# Execute complex task across multiple agents
task = """
Analyze the AI healthcare market for a $50M investment opportunity.
Focus on market size, competition, financials, and risks.
"""
# ThreadPoolExecutor automatically manages:
# - Thread lifecycle
# - Resource cleanup
# - Exception handling
# - Result collection
result = workflow.run(task)
```
## Advanced Features
### Custom System Prompt Building
The AgentLoader automatically builds comprehensive system prompts from the markdown structure:
### Example 3: Mixed File Types
```python
from swarms.structs import AgentLoader
loader = AgentLoader()
config = loader.parse_markdown_file("agent.md")
# The system prompt includes:
# - Role description from the table
# - Focus areas as bullet points
# - Approach as numbered steps
# - Expected outputs as deliverables
# Load agents from different file types
markdown_agents = loader.load_agents_from_markdown("./md_agents/")
yaml_agents = loader.load_agents_from_yaml("config.yaml")
csv_agents = loader.load_agents_from_csv("data.csv")
print("Generated System Prompt:")
print(config.system_prompt)
```
# Combine all agents
all_agents = markdown_agents + yaml_agents + csv_agents
print(f"Loaded {len(all_agents)} agents from various sources")
```
## Integration with Swarms
## Error Handling
The loaded agents are fully compatible with Swarms orchestration systems:
The AgentLoader provides comprehensive error handling:
```python
from swarms.utils import load_agents_from_markdown
from swarms.structs import SequentialWorkflow
from swarms.structs import AgentLoader
# Load multiple specialized agents
agents = load_agents_from_markdown("./specialist_agents/")
loader = AgentLoader()
# Create a sequential workflow
workflow = SequentialWorkflow(
agents=agents,
max_loops=1
)
try:
# This will raise FileNotFoundError
agent = loader.load_agent_from_markdown("nonexistent.md")
except FileNotFoundError as e:
print(f"File not found: {e}")
# Execute complex task across multiple agents
result = workflow.run("Conduct a comprehensive system audit")
try:
# This will handle parsing errors gracefully
agents = loader.load_multiple_agents("./invalid_directory/")
print(f"Successfully loaded {len(agents)} agents")
except Exception as e:
print(f"Error loading agents: {e}")
```
## Best Practices
1. **Consistent Naming**: Use clear, descriptive agent names
2. **Detailed Descriptions**: Provide comprehensive role descriptions
3. **Structured Sections**: Use the optional sections to define agent behavior
3. **Structured Content**: Use clear sections to define agent behavior
4. **Error Handling**: Always wrap agent loading in try-catch blocks
5. **Model Selection**: Choose appropriate models based on agent complexity
6. **Configuration**: Override defaults when specific behavior is needed
7. **File Organization**: Organize agents by domain or function
8. **Memory Management**: Use `max_file_size_mb` for large agent collections
## API Reference
@ -333,57 +416,243 @@ result = workflow.run("Conduct a comprehensive system audit")
```python
class AgentLoader:
def __init__(self, model: Optional[LiteLLM] = None)
def parse_markdown_file(self, file_path: str) -> MarkdownAgentConfig
def load_single_agent(self, file_path: str, **kwargs) -> Agent
def load_multiple_agents(self, file_paths: Union[str, List[str]], **kwargs) -> List[Agent]
"""
Loader class for creating Agent objects from various file formats.
This class provides methods to load agents from Markdown, YAML, and CSV files.
"""
def __init__(self):
"""Initialize the AgentLoader instance."""
pass
def load_agents_from_markdown(
self,
file_paths: Union[str, List[str]],
concurrent: bool = True,
max_file_size_mb: float = 10.0,
**kwargs
) -> List[Agent]:
"""
Load multiple agents from one or more Markdown files.
Args:
file_paths: Path or list of paths to Markdown file(s)
concurrent: Whether to load files concurrently
max_file_size_mb: Maximum file size in MB to process
**kwargs: Additional keyword arguments passed to the underlying loader
Returns:
A list of loaded Agent objects
"""
def load_agent_from_markdown(
self,
file_path: str,
**kwargs
) -> Agent:
"""
Load a single agent from a Markdown file.
Args:
file_path: Path to the Markdown file containing the agent definition
**kwargs: Additional keyword arguments passed to the underlying loader
Returns:
The loaded Agent object
"""
def load_agents_from_yaml(
self,
yaml_file: str,
return_type: ReturnTypes = "auto",
**kwargs
) -> List[Agent]:
"""
Load agents from a YAML file.
Args:
yaml_file: Path to the YAML file containing agent definitions
return_type: The return type for the loader
**kwargs: Additional keyword arguments passed to the underlying loader
Returns:
A list of loaded Agent objects
"""
def load_agents_from_csv(
self,
csv_file: str,
**kwargs
) -> List[Agent]:
"""
Load agents from a CSV file.
Args:
csv_file: Path to the CSV file containing agent definitions
**kwargs: Additional keyword arguments passed to the underlying loader
Returns:
A list of loaded Agent objects
"""
def auto(
self,
file_path: str,
*args,
**kwargs
):
"""
Automatically load agents from a file based on its extension.
Args:
file_path: Path to the agent file (Markdown, YAML, or CSV)
*args: Additional positional arguments passed to the underlying loader
**kwargs: Additional keyword arguments passed to the underlying loader
Returns:
A list of loaded Agent objects
Raises:
ValueError: If the file type is not supported
"""
```
**Method Parameters and Return Types:**
| Method | Parameters | Type | Required | Default | Return Type | Description |
|--------|------------|------|----------|---------|-------------|-------------|
| `load_agents_from_markdown` | `file_paths` | Union[str, List[str]] | ✅ Yes | - | List[Agent] | File path(s) or directory |
| `load_agents_from_markdown` | `concurrent` | bool | ❌ No | True | List[Agent] | Enable concurrent processing |
| `load_agents_from_markdown` | `max_file_size_mb` | float | ❌ No | 10.0 | List[Agent] | Max file size in MB |
| `load_agents_from_markdown` | `**kwargs` | dict | ❌ No | {} | List[Agent] | Configuration overrides |
| `load_agent_from_markdown` | `file_path` | str | ✅ Yes | - | Agent | Path to markdown file |
| `load_agent_from_markdown` | `**kwargs` | dict | ❌ No | {} | Agent | Configuration overrides |
| `load_agents_from_yaml` | `yaml_file` | str | ✅ Yes | - | List[Agent] | Path to YAML file |
| `load_agents_from_yaml` | `return_type` | ReturnTypes | ❌ No | "auto" | List[Agent] | Return type for loader |
| `load_agents_from_yaml` | `**kwargs` | dict | ❌ No | {} | List[Agent] | Configuration overrides |
| `load_agents_from_csv` | `csv_file` | str | ✅ Yes | - | List[Agent] | Path to CSV file |
| `load_agents_from_csv` | `**kwargs` | dict | ❌ No | {} | List[Agent] | Configuration overrides |
| `auto` | `file_path` | str | ✅ Yes | - | List[Agent] | Path to agent file |
| `auto` | `*args` | tuple | ❌ No | () | List[Agent] | Positional arguments |
| `auto` | `**kwargs` | dict | ❌ No | {} | List[Agent] | Keyword arguments |
### Convenience Functions
```python
def load_agent_from_markdown(file_path: str, **kwargs) -> Agent
def load_agent_from_markdown(
file_path: str,
**kwargs
) -> Agent:
"""
Load a single agent from a markdown file using the Claude Code YAML frontmatter format.
Args:
file_path: Path to the markdown file containing YAML frontmatter
**kwargs: Optional keyword arguments to override agent configuration
Returns:
Configured Agent instance loaded from the markdown file
"""
def load_agents_from_markdown(
file_paths: Union[str, List[str]],
concurrent: bool = True, # Enable concurrent processing
max_workers: Optional[int] = None, # Max worker threads (defaults to CPU count)
concurrent: bool = True,
max_file_size_mb: float = 10.0,
**kwargs
) -> List[Agent]
) -> List[Agent]:
"""
Load multiple agents from markdown files using the Claude Code YAML frontmatter format.
Args:
file_paths: Either a directory path containing markdown files or a list of markdown file paths
concurrent: If True, enables concurrent processing for faster loading
max_file_size_mb: Maximum file size (in MB) for each markdown file
**kwargs: Optional keyword arguments to override agent configuration
Returns:
List of configured Agent instances loaded from the markdown files
"""
```
**Function Parameters:**
| Function | Parameter | Type | Required | Default | Description |
|----------|-----------|------|----------|---------|-------------|
| `load_agent_from_markdown` | `file_path` | str | ✅ Yes | - | Path to markdown file |
| `load_agent_from_markdown` | `**kwargs` | dict | ❌ No | {} | Configuration overrides |
| `load_agents_from_markdown` | `file_paths` | Union[str, List[str]] | ✅ Yes | - | File path(s) or directory |
| `load_agents_from_markdown` | `concurrent` | bool | ❌ No | True | Enable concurrent processing |
| `load_agents_from_markdown` | `max_file_size_mb` | float | ❌ No | 10.0 | Max file size in MB |
| `load_agents_from_markdown` | `**kwargs` | dict | ❌ No | {} | Configuration overrides |
### Configuration Model
```python
class MarkdownAgentConfig(BaseModel):
name: str
description: str
model_name: Optional[str] = "gpt-4"
temperature: Optional[float] = 0.1 # Model temperature (0.0-2.0)
mcp_url: Optional[str] = None # Optional MCP server URL
system_prompt: str
max_loops: int = 1
autosave: bool = False
dashboard: bool = False
verbose: bool = False
# ... additional configuration fields
"""Configuration model for agents loaded from Claude Code markdown files."""
name: Optional[str] = None
description: Optional[str] = None
model_name: Optional[str] = "gpt-4.1"
temperature: Optional[float] = Field(default=0.1, ge=0.0, le=2.0)
mcp_url: Optional[int] = None
system_prompt: Optional[str] = None
max_loops: Optional[int] = Field(default=1, ge=1)
autosave: Optional[bool] = False
dashboard: Optional[bool] = False
verbose: Optional[bool] = False
dynamic_temperature_enabled: Optional[bool] = False
saved_state_path: Optional[str] = None
user_name: Optional[str] = "default_user"
retry_attempts: Optional[int] = Field(default=3, ge=1)
context_length: Optional[int] = Field(default=100000, ge=1000)
return_step_meta: Optional[bool] = False
output_type: Optional[str] = "str"
auto_generate_prompt: Optional[bool] = False
streaming_on: Optional[bool] = False
```
**MarkdownAgentConfig Schema:**
| Field | Type | Required | Default | Validation | Description |
|-------|------|----------|---------|------------|-------------|
| `name` | Optional[str] | ❌ No | None | - | Agent name |
| `description` | Optional[str] | ❌ No | None | - | Agent description |
| `model_name` | Optional[str] | ❌ No | "gpt-4.1" | - | Model to use |
| `temperature` | Optional[float] | ❌ No | 0.1 | 0.0 ≤ x ≤ 2.0 | Model temperature |
| `mcp_url` | Optional[int] | ❌ No | None | - | MCP server URL |
| `system_prompt` | Optional[str] | ❌ No | None | Non-empty string | System prompt |
| `max_loops` | Optional[int] | ❌ No | 1 | ≥ 1 | Maximum reasoning loops |
| `autosave` | Optional[bool] | ❌ No | False | - | Enable auto-save |
| `dashboard` | Optional[bool] | ❌ No | False | - | Enable dashboard |
| `verbose` | Optional[bool] | ❌ No | False | - | Enable verbose logging |
| `dynamic_temperature_enabled` | Optional[bool] | ❌ No | False | - | Enable dynamic temperature |
| `saved_state_path` | Optional[str] | ❌ No | None | - | State save path |
| `user_name` | Optional[str] | ❌ No | "default_user" | - | User identifier |
| `retry_attempts` | Optional[int] | ❌ No | 3 | ≥ 1 | Retry attempts |
| `context_length` | Optional[int] | ❌ No | 100000 | ≥ 1000 | Context length |
| `return_step_meta` | Optional[bool] | ❌ No | False | - | Return step metadata |
| `output_type` | Optional[str] | ❌ No | "str" | - | Output format |
| `auto_generate_prompt` | Optional[bool] | ❌ No | False | - | Auto-generate prompts |
| `streaming_on` | Optional[bool] | ❌ No | False | - | Enable streaming |
## Examples Repository
Find complete working examples in the `examples/agent_loader/` directory:
Find complete working examples in the `examples/utils/agent_loader/` directory:
### Single Agent Example (`agent_loader_demo.py`)
```python
from swarms.utils import load_agent_from_markdown
agent = load_agent_from_markdown("finance_advisor.md")
agent.run(
task="Analyze the financial market trends for 2023."
)
agent.run(task="What were the best performing etfs in 2023")
```
### Multi-Agent Workflow Example (`multi_agents_loader_demo.py`)
```python
from swarms.utils import load_agents_from_markdown
@ -410,11 +679,12 @@ result = workflow.run(task)
```
### Sample Agent Definition (`finance_advisor.md`)
```markdown
---
name: FinanceAdvisor
description: Expert financial advisor for investment and budgeting guidance
model_name: gpt-4o
model_name: claude-sonnet-4-20250514
temperature: 0.7
max_loops: 1
---
@ -444,6 +714,67 @@ When analyzing financial situations:
5. Suggest specific action steps
```
## Performance Considerations
### Concurrent Processing
- **Default Behavior**: Uses `os.cpu_count() * 2` worker threads
- **Memory Management**: Automatically validates file sizes before processing
- **Timeout Handling**: 5-minute total timeout, 1-minute per agent timeout
- **Error Recovery**: Continues processing other files if individual files fail
### File Size Limits
- **Default Limit**: 10MB maximum file size
- **Configurable**: Adjustable via `max_file_size_mb` parameter
- **Memory Safety**: Prevents memory issues with large agent definitions
### Resource Optimization
```python
# For large numbers of agents, consider batch processing
loader = AgentLoader()
# Process in smaller batches
batch1 = loader.load_agents_from_markdown("./batch1/", concurrent=True)
batch2 = loader.load_agents_from_markdown("./batch2/", concurrent=True)
# Or limit concurrent workers for resource-constrained environments
agents = loader.load_agents_from_markdown(
"./agents/",
concurrent=True,
max_file_size_mb=5.0 # Smaller files for faster processing
)
```
## Troubleshooting
### Common Issues
1. **File Not Found**: Ensure file paths are correct and files exist
2. **YAML Parsing Errors**: Check YAML frontmatter syntax in markdown files
3. **Memory Issues**: Reduce `max_file_size_mb` or process files in smaller batches
4. **Timeout Errors**: Check file sizes and network connectivity for remote files
5. **Configuration Errors**: Verify all required fields are present in agent definitions
### Debug Mode
```python
import logging
from swarms.structs import AgentLoader
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
loader = AgentLoader()
# Load with verbose output
agent = loader.load_agent_from_markdown(
"agent.md",
verbose=True
)
```
## Support
For questions and support:

@ -8,7 +8,7 @@ agent = Agent(
dynamic_temperature_enabled=True,
max_loops=1,
dynamic_context_window=True,
streaming_on=True,
streaming_on=False,
)
out = agent.run(

@ -1,42 +0,0 @@
import asyncio
from browser_use import Agent
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from swarms import ConcurrentWorkflow
load_dotenv()
class BrowserAgent:
def __init__(self, agent_name: str = "BrowserAgent"):
self.agent_name = agent_name
async def browser_agent_test(self, task: str):
agent = Agent(
task=task,
llm=ChatOpenAI(model="gpt-4o"),
)
result = await agent.run()
return result
def run(self, task: str):
return asyncio.run(self.browser_agent_test(task))
swarm = ConcurrentWorkflow(
agents=[BrowserAgent() for _ in range(10)],
)
swarm.run(
"""Please navigate to chat.com and engage in a detailed technical discussion with ChatGPT about the following specific aspects of future high-energy physics:
1. The potential discoveries and physics reach of the Future Circular Collider (FCC) compared to the LHC
2. Theoretical predictions for supersymmetric particles and dark matter candidates at energy scales above 100 TeV
3. Novel detector technologies needed for future collider experiments, particularly for tracking and calorimetry
4. The role of machine learning and quantum computing in analyzing high-energy physics data
5. Challenges and proposed solutions for beam focusing and acceleration at extremely high energies
Please document the key insights and technical details from this discussion."""
)

@ -1,118 +0,0 @@
import os
from dotenv import load_dotenv
from swarms import Agent, SequentialWorkflow
from swarm_models import OpenAIChat
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize specialized agents
data_extractor_agent = Agent(
agent_name="Data-Extractor",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="data_extractor_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
summarizer_agent = Agent(
agent_name="Document-Summarizer",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="summarizer_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
financial_analyst_agent = Agent(
agent_name="Financial-Analyst",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="financial_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
market_analyst_agent = Agent(
agent_name="Market-Analyst",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="market_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
operational_analyst_agent = Agent(
agent_name="Operational-Analyst",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="operational_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
# Initialize the SwarmRouter
router = SequentialWorkflow(
name="pe-document-analysis-swarm",
description="Analyze documents for private equity due diligence and investment decision-making",
max_loops=1,
agents=[
data_extractor_agent,
summarizer_agent,
financial_analyst_agent,
market_analyst_agent,
operational_analyst_agent,
],
output_type="all",
)
# Example usage
if __name__ == "__main__":
# Run a comprehensive private equity document analysis task
result = router.run(
"Where is the best place to find template term sheets for series A startups. Provide links and references",
img=None,
)
print(result)

@ -1,27 +1,10 @@
import os
from dotenv import load_dotenv
from swarms import Agent, SequentialWorkflow
from swarm_models import OpenAIChat
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize specialized agents
data_extractor_agent = Agent(
agent_name="Data-Extractor",
system_prompt=None,
llm=model,
model_name="gpt-4.1",
max_loops=1,
autosave=True,
verbose=True,
@ -36,7 +19,7 @@ data_extractor_agent = Agent(
summarizer_agent = Agent(
agent_name="Document-Summarizer",
system_prompt=None,
llm=model,
model_name="gpt-4.1",
max_loops=1,
autosave=True,
verbose=True,
@ -51,7 +34,7 @@ summarizer_agent = Agent(
financial_analyst_agent = Agent(
agent_name="Financial-Analyst",
system_prompt=None,
llm=model,
model_name="gpt-4.1",
max_loops=1,
autosave=True,
verbose=True,
@ -66,7 +49,7 @@ financial_analyst_agent = Agent(
market_analyst_agent = Agent(
agent_name="Market-Analyst",
system_prompt=None,
llm=model,
model_name="gpt-4.1",
max_loops=1,
autosave=True,
verbose=True,
@ -81,7 +64,7 @@ market_analyst_agent = Agent(
operational_analyst_agent = Agent(
agent_name="Operational-Analyst",
system_prompt=None,
llm=model,
model_name="gpt-4.1",
max_loops=1,
autosave=True,
verbose=True,

@ -1,143 +0,0 @@
import os
from dotenv import load_dotenv
from swarms import Agent, SequentialWorkflow
from swarm_models import OpenAIChat
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize specialized agents
data_extractor_agent = Agent(
agent_name="Data-Extractor",
system_prompt="""You are a data extraction specialist. Your role is to:
1. Extract key information, data points, and metrics from documents
2. Identify and pull out important facts, figures, and statistics
3. Structure extracted data in a clear, organized format
4. Flag any inconsistencies or missing data
5. Ensure accuracy in data extraction while maintaining context""",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="data_extractor_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
summarizer_agent = Agent(
agent_name="Document-Summarizer",
system_prompt="""You are a document summarization expert. Your role is to:
1. Create concise, comprehensive summaries of documents
2. Highlight key points and main takeaways
3. Maintain the essential meaning while reducing length
4. Structure summaries in a logical, readable format
5. Identify and emphasize critical insights""",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="summarizer_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
financial_analyst_agent = Agent(
agent_name="Financial-Analyst",
system_prompt="""You are a financial analysis expert. Your role is to:
1. Analyze financial statements and metrics
2. Evaluate company valuations and financial projections
3. Assess financial risks and opportunities
4. Provide insights on financial performance and health
5. Make recommendations based on financial analysis""",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="financial_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
market_analyst_agent = Agent(
agent_name="Market-Analyst",
system_prompt="""You are a market analysis expert. Your role is to:
1. Analyze market trends and dynamics
2. Evaluate competitive landscape and market positioning
3. Identify market opportunities and threats
4. Assess market size and growth potential
5. Provide strategic market insights and recommendations""",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="market_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
operational_analyst_agent = Agent(
agent_name="Operational-Analyst",
system_prompt="""You are an operational analysis expert. Your role is to:
1. Analyze business operations and processes
2. Evaluate operational efficiency and effectiveness
3. Identify operational risks and opportunities
4. Assess scalability and growth potential
5. Provide recommendations for operational improvements""",
llm=model,
max_loops=2,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="operational_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
# Initialize the SwarmRouter
router = SequentialWorkflow(
name="pe-document-analysis-swarm",
description="Analyze documents for private equity due diligence and investment decision-making",
max_loops=1,
agents=[
data_extractor_agent,
summarizer_agent,
financial_analyst_agent,
market_analyst_agent,
operational_analyst_agent,
],
output_type="all",
)
# Example usage
if __name__ == "__main__":
# Run a comprehensive private equity document analysis task
result = router.run(
"Where is the best place to find template term sheets for series A startups. Provide links and references",
no_use_clusterops=True,
)
print(result)

@ -9,10 +9,9 @@ market_researcher = Agent(
3. Evaluating competitor strategies
4. Assessing customer needs and preferences
5. Providing actionable market insights""",
model_name="claude-3-sonnet-20240229",
model_name="gpt-4.1",
max_loops=1,
temperature=0.7,
streaming_on=True,
)
# Initialize financial analyst agent
@ -24,9 +23,8 @@ financial_analyst = Agent(
3. Assessing risk factors
4. Providing financial forecasts
5. Recommending financial strategies""",
model_name="claude-3-sonnet-20240229",
model_name="gpt-4.1",
max_loops=1,
streaming_on=True,
temperature=0.7,
)
@ -39,10 +37,9 @@ technical_analyst = Agent(
3. Identifying support and resistance levels
4. Assessing market momentum
5. Providing trading recommendations""",
model_name="claude-3-sonnet-20240229",
model_name="gpt-4.1",
max_loops=1,
temperature=0.7,
streaming_on=True,
)
# Create list of agents
@ -53,11 +50,9 @@ router = SequentialWorkflow(
name="market-analysis-router",
agents=agents,
max_loops=1,
# output_type="all",
team_awareness=True,
)
result = router.run(
"Analyze Tesla (TSLA) stock from market, financial, and technical perspectives"
)
result = router.run("What are the best 3 oil ETFs?")
print(result)

@ -0,0 +1,67 @@
import asyncio
from browser_use import Agent
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from swarms import ConcurrentWorkflow
load_dotenv()
class BrowserAgent:
def __init__(self, agent_name: str = "BrowserAgent"):
"""
Initialize a BrowserAgent with a given name.
Args:
agent_name (str): The name of the browser agent.
"""
self.agent_name = agent_name
async def browser_agent_test(self, task: str):
"""
Asynchronously run the browser agent on a given task.
Args:
task (str): The task prompt for the agent.
Returns:
Any: The result of the agent's run method.
"""
agent = Agent(
task=task,
llm=ChatOpenAI(model="gpt-4.1"),
)
result = await agent.run()
return result
def run(self, task: str):
"""
Run the browser agent synchronously on a given task.
Args:
task (str): The task prompt for the agent.
Returns:
Any: The result of the agent's run method.
"""
return asyncio.run(self.browser_agent_test(task))
swarm = ConcurrentWorkflow(
agents=[BrowserAgent() for _ in range(10)],
)
swarm.run(
"""Please navigate to https://www.coingecko.com and identify the best performing cryptocurrency coin over the past 24 hours.
Your task is to:
1. Go to the main page of CoinGecko.
2. Locate the list of coins and their performance metrics.
3. Determine which coin has the highest positive percentage change in price over the last 24 hours.
4. Report the name, symbol, and 24h percentage gain of this top-performing coin.
5. Briefly describe any notable trends or observations about this coin's recent performance.
Please provide your findings in a clear and concise summary."""
)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "8.1.2"
version = "8.1.3"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -319,13 +319,20 @@ You are part of a collaborative multi-agent system. Work together to solve compl
### Core Principles
1. **Clarity**: Restate tasks in your own words.
2. **Role Awareness**: Know your role and don't assume others' roles.
3. **Communication**: Share all relevant information and acknowledge others' inputs.
3. **Communication**: Share all relevant information and actively acknowledge and reference other agents' outputs.
4. **Verification**: Use the 3C Protocol (Completeness, Coherence, Correctness).
5. **Reflection**: Continuously evaluate your actions and their impact.
### Key Protocols
- Before acting: Verify if task is already done by others
- During execution: Share reasoning and intermediate steps
- After completion: Get verification from at least one other agent
- Always: Explain your rationale and acknowledge others' contributions
- Before acting: Check and acknowledge if the task or part of it has already been completed by another agent. Reference their output explicitly.
- During execution: Share your reasoning and intermediate steps. When building on or differing from another agent's output, mention them by name and explain your reasoning.
- After completion: Request verification from at least one other agent, and acknowledge their feedback or validation.
- Always: Clearly explain your rationale, and explicitly acknowledge, reference, or build upon the outputs and contributions of other agents.
#### Example Acknowledgement Phrases
- "Building on Agent-2's previous output, I propose..."
- "I have reviewed Agent-3's result and confirm it is correct."
- "Agent-1 raised a good point regarding step 2; I will address it as follows..."
- "My output differs from Agent-4's because..."
"""

@ -1,5 +1,6 @@
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from typing import Dict, List, Any, Optional
class MCPConnection(BaseModel):
@ -37,7 +38,7 @@ class MCPConnection(BaseModel):
class MultipleMCPConnections(BaseModel):
connections: List[MCPConnection] = Field(
default=[], description="List of MCP connections"
description="List of MCP connections"
)
class Config:

@ -103,6 +103,7 @@ from swarms.structs.swarming_architectures import (
staircase_swarm,
star_swarm,
)
from swarms.structs.agent_loader import AgentLoader
__all__ = [
"Agent",
@ -190,4 +191,5 @@ __all__ = [
"check_cancelled",
"check_exit",
"check_end",
"AgentLoader",
]

@ -52,6 +52,7 @@ from swarms.schemas.base_schemas import (
from swarms.schemas.conversation_schema import ConversationSchema
from swarms.schemas.mcp_schemas import (
MCPConnection,
MultipleMCPConnections,
)
from swarms.structs.agent_roles import agent_roles
from swarms.structs.conversation import Conversation
@ -419,6 +420,7 @@ class Agent:
safety_prompt_on: bool = False,
random_models_on: bool = False,
mcp_config: Optional[MCPConnection] = None,
mcp_configs: Optional[MultipleMCPConnections] = None,
top_p: Optional[float] = 0.90,
conversation_schema: Optional[ConversationSchema] = None,
llm_base_url: Optional[str] = None,
@ -571,6 +573,7 @@ class Agent:
self.reasoning_prompt_on = reasoning_prompt_on
self.dynamic_context_window = dynamic_context_window
self.show_tool_execution_output = show_tool_execution_output
self.mcp_configs = mcp_configs
# self.init_handling()
self.setup_config()

@ -6,11 +6,11 @@ from swarms.agents.create_agents_from_yaml import (
create_agents_from_yaml,
)
from swarms.structs.agent import Agent
from swarms.structs.csv_to_agent import AgentLoader as CSVAgentLoader
from swarms.structs.csv_to_agent import CSVAgentLoader
from swarms.utils.agent_loader_markdown import (
load_agent_from_markdown,
load_agents_from_markdown,
AgentLoader as MarkdownAgentLoader,
MarkdownAgentLoader,
)
@ -21,10 +21,11 @@ class AgentLoader:
This class provides methods to load agents from Markdown, YAML, and CSV files.
"""
def __init__(self):
def __init__(self, concurrent: bool = True):
"""
Initialize the AgentLoader instance.
"""
self.concurrent = concurrent
pass
def load_agents_from_markdown(

@ -1,12 +1,9 @@
import json
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, List, Optional, Union
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.structs.multi_agent_exec import get_agents_info
from swarms.telemetry.main import log_agent_data
from swarms.utils.any_to_str import any_to_str
from swarms.utils.history_output_formatter import (
@ -14,51 +11,12 @@ from swarms.utils.history_output_formatter import (
)
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
from swarms.structs.swarm_id import swarm_id
logger = initialize_logger(log_folder="rearrange")
def swarm_id():
return uuid.uuid4().hex
class AgentRearrange(BaseSwarm):
"""
A class representing a swarm of agents for rearranging tasks.
Attributes:
id (str): Unique identifier for the swarm
name (str): Name of the swarm
description (str): Description of the swarm's purpose
agents (callable): Dictionary mapping agent names to Agent objects
flow (str): The flow pattern defining task execution order
max_loops (int): Maximum number of execution loops
verbose (bool): Whether to enable verbose logging
memory_system (BaseVectorDatabase): Memory system for storing agent interactions
human_in_the_loop (bool): Whether human intervention is enabled
custom_human_in_the_loop (Callable): Custom function for human intervention
return_json (bool): Whether to return output in JSON format
output_type (OutputType): Format of output ("all", "final", "list", or "dict")
swarm_history (dict): History of agent interactions
input_config (AgentRearrangeInput): Input configuration schema
output_schema (AgentRearrangeOutput): Output schema
Methods:
__init__(): Initializes the AgentRearrange object
reliability_checks(): Validates swarm configuration
set_custom_flow(): Sets a custom flow pattern
add_agent(): Adds an agent to the swarm
track_history(): Records agent interaction history
remove_agent(): Removes an agent from the swarm
add_agents(): Adds multiple agents to the swarm
validate_flow(): Validates the flow pattern
run(): Executes the swarm's task processing
astream(): Runs the swarm with streaming output
batch_run(): Processes multiple tasks in batches
abatch_run(): Asynchronously processes multiple tasks in batches
concurrent_run(): Processes multiple tasks concurrently
"""
class AgentRearrange:
def __init__(
self,
@ -74,29 +32,17 @@ class AgentRearrange(BaseSwarm):
custom_human_in_the_loop: Optional[
Callable[[str], str]
] = None,
return_json: bool = False,
output_type: OutputType = "all",
docs: List[str] = None,
doc_folder: str = None,
device: str = "cpu",
device_id: int = 0,
all_cores: bool = False,
all_gpus: bool = True,
no_use_clusterops: bool = True,
autosave: bool = True,
return_entire_history: bool = False,
rules: str = None,
team_awareness: bool = False,
time_enabled: bool = False,
message_id_on: bool = False,
*args,
**kwargs,
):
super(AgentRearrange, self).__init__(
name=name,
description=description,
agents=agents if agents else [],
*args,
**kwargs,
)
self.name = name
self.description = description
self.id = id
self.agents = {agent.agent_name: agent for agent in agents}
self.flow = flow if flow is not None else ""
@ -105,29 +51,46 @@ class AgentRearrange(BaseSwarm):
self.memory_system = memory_system
self.human_in_the_loop = human_in_the_loop
self.custom_human_in_the_loop = custom_human_in_the_loop
self.return_json = return_json
self.output_type = output_type
self.docs = docs
self.doc_folder = doc_folder
self.device = device
self.device_id = device_id
self.all_cores = all_cores
self.all_gpus = all_gpus
self.no_use_clusterops = no_use_clusterops
self.autosave = autosave
self.return_entire_history = return_entire_history
self.time_enabled = time_enabled
self.message_id_on = message_id_on
self.conversation = Conversation(
time_enabled=False, token_count=False
name=f"{self.name}-Conversation",
time_enabled=self.time_enabled,
token_count=False,
message_id_on=self.message_id_on,
)
if rules:
self.conversation.add("User", rules)
self.conversation.add("user", rules)
if team_awareness is True:
agents_info = get_agents_info(self.agents, self.name)
# agents_info = get_agents_info(agents=self.agents, team_name=self.name)
# Add sequential flow information if available
sequential_info = self._get_sequential_flow_info()
if sequential_info:
# agents_info += "\n\n" + sequential_info
self.conversation.add("system", sequential_info)
# self.conversation.add("system", agents_info)
self.conversation.add("Your Swarm", agents_info)
self.reliability_check()
def reliability_check(self):
if self.agents is None or len(self.agents) == 0:
raise ValueError("Agents list cannot be None or empty")
if self.max_loops == 0:
raise ValueError("max_loops cannot be 0")
if self.flow is None or self.flow == "":
raise ValueError("flow cannot be None or empty")
if self.output_type is None or self.output_type == "":
raise ValueError("output_type cannot be None or empty")
def set_custom_flow(self, flow: str):
self.flow = flow
@ -213,6 +176,136 @@ class AgentRearrange(BaseSwarm):
logger.info(f"Flow: {self.flow} is valid.")
return True
def _get_sequential_awareness(
self, agent_name: str, tasks: List[str]
) -> str:
"""
Determines the sequential awareness information for an agent in a sequential flow.
Args:
agent_name (str): The name of the current agent.
tasks (List[str]): The list of tasks in the flow.
Returns:
str: A string describing the agents ahead and behind in the sequence.
"""
# Find the position of the current agent in the flow
agent_position = None
for i, task in enumerate(tasks):
agent_names = [name.strip() for name in task.split(",")]
if agent_name in agent_names:
agent_position = i
break
if agent_position is None:
return ""
awareness_info = []
# Check if there's an agent before (ahead in the sequence)
if agent_position > 0:
prev_task = tasks[agent_position - 1]
prev_agents = [
name.strip() for name in prev_task.split(",")
]
if (
prev_agents and prev_agents[0] != "H"
): # Skip human agents
awareness_info.append(
f"Agent ahead: {', '.join(prev_agents)}"
)
# Check if there's an agent after (behind in the sequence)
if agent_position < len(tasks) - 1:
next_task = tasks[agent_position + 1]
next_agents = [
name.strip() for name in next_task.split(",")
]
if (
next_agents and next_agents[0] != "H"
): # Skip human agents
awareness_info.append(
f"Agent behind: {', '.join(next_agents)}"
)
if awareness_info:
return (
f"Sequential awareness: {' | '.join(awareness_info)}"
)
return ""
def _get_sequential_flow_info(self) -> str:
"""
Gets information about the overall sequential flow structure.
Returns:
str: A string describing the sequential flow structure.
"""
if not self.flow or "->" not in self.flow:
return ""
tasks = self.flow.split("->")
flow_info = []
for i, task in enumerate(tasks):
agent_names = [name.strip() for name in task.split(",")]
if (
agent_names and agent_names[0] != "H"
): # Skip human agents
position_info = (
f"Step {i+1}: {', '.join(agent_names)}"
)
if i > 0:
prev_task = tasks[i - 1]
prev_agents = [
name.strip() for name in prev_task.split(",")
]
if prev_agents and prev_agents[0] != "H":
position_info += (
f" (follows: {', '.join(prev_agents)})"
)
if i < len(tasks) - 1:
next_task = tasks[i + 1]
next_agents = [
name.strip() for name in next_task.split(",")
]
if next_agents and next_agents[0] != "H":
position_info += (
f" (leads to: {', '.join(next_agents)})"
)
flow_info.append(position_info)
if flow_info:
return "Sequential Flow Structure:\n" + "\n".join(
flow_info
)
return ""
def get_agent_sequential_awareness(self, agent_name: str) -> str:
"""
Gets the sequential awareness information for a specific agent.
Args:
agent_name (str): The name of the agent to get awareness for.
Returns:
str: A string describing the agents ahead and behind in the sequence.
"""
if not self.flow or "->" not in self.flow:
return ""
tasks = self.flow.split("->")
return self._get_sequential_awareness(agent_name, tasks)
def get_sequential_flow_structure(self) -> str:
"""
Gets the overall sequential flow structure information.
Returns:
str: A string describing the complete sequential flow structure.
"""
return self._get_sequential_flow_info()
def _run(
self,
task: str = None,
@ -319,6 +412,20 @@ class AgentRearrange(BaseSwarm):
agent = self.agents[agent_name]
# Add sequential awareness information for the agent
awareness_info = (
self._get_sequential_awareness(
agent_name, tasks
)
)
if awareness_info:
self.conversation.add(
"system", awareness_info
)
logger.info(
f"Added sequential awareness for {agent_name}: {awareness_info}"
)
current_task = agent.run(
task=self.conversation.get_str(),
img=img,
@ -368,11 +475,6 @@ class AgentRearrange(BaseSwarm):
Args:
task (str, optional): The task to execute. Defaults to None.
img (str, optional): Path to input image if required. Defaults to None.
device (str, optional): Computing device to use ('cpu' or 'gpu'). Defaults to "cpu".
device_id (int, optional): ID of specific device to use. Defaults to 1.
all_cores (bool, optional): Whether to use all CPU cores. Defaults to True.
all_gpus (bool, optional): Whether to use all available GPUs. Defaults to False.
no_use_clusterops (bool, optional): Whether to use clusterops. Defaults to False.
*args: Additional positional arguments passed to _run().
**kwargs: Additional keyword arguments passed to _run().
@ -419,10 +521,6 @@ class AgentRearrange(BaseSwarm):
tasks: List[str],
img: Optional[List[str]] = None,
batch_size: int = 10,
device: str = "cpu",
device_id: int = None,
all_cores: bool = True,
all_gpus: bool = False,
*args,
**kwargs,
) -> List[str]:
@ -456,10 +554,6 @@ class AgentRearrange(BaseSwarm):
self.run(
task=task,
img=img_path,
device=device,
device_id=device_id,
all_cores=all_cores,
all_gpus=all_gpus,
*args,
**kwargs,
)

@ -4,8 +4,6 @@ from tenacity import retry, stop_after_attempt, wait_exponential
from typing import Union, Callable, Any
from swarms import Agent
from swarms.utils.loguru_logger import initialize_logger
# from swarms.utils.lazy_loader import lazy_import_decorator
from swarms.utils.auto_download_check_packages import (
auto_check_and_download_package,
)

@ -1,6 +1,6 @@
import os
import traceback
from typing import List, Optional
from typing import List, Literal, Optional
from dotenv import load_dotenv
from loguru import logger
@ -202,7 +202,6 @@ class SwarmRouterConfig(BaseModel):
def reasoning_agent_run(
self,
task: str,
img: Optional[str] = None,
name: str = None,
@ -257,6 +256,11 @@ class AutoSwarmBuilder:
generate_router_config: bool = False,
interactive: bool = False,
max_tokens: int = 8000,
execution_type: Literal[
"return-agents",
"execute-swarm-router",
"return-agent-configurations",
] = "return-agents",
):
"""Initialize the AutoSwarmBuilder.
@ -277,6 +281,7 @@ class AutoSwarmBuilder:
self.generate_router_config = generate_router_config
self.interactive = interactive
self.max_tokens = max_tokens
self.execution_type = execution_type
self.conversation = Conversation()
self.reliability_check()

@ -1,5 +1,4 @@
import os
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
from typing import Dict, Optional, Tuple
@ -13,6 +12,8 @@ from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.structs.swarm_id import swarm_id
class EvaluationError(Exception):
"""Base exception for evaluation-related errors."""
@ -32,13 +33,6 @@ class AggregationError(EvaluationError):
pass
def swarm_id() -> str:
"""
Generate a unique ID for the swarm.
"""
return str(uuid.uuid4())
# Define evaluation dimensions and their evaluation goals
EVAL_DIMENSIONS: Dict[str, str] = {
"accuracy": """Conduct a rigorous factual accuracy assessment of the model's response:

@ -169,7 +169,7 @@ class AgentValidator:
)
class AgentLoader:
class CSVAgentLoader:
"""Class to manage agents through various file formats with type safety and high performance"""
def __init__(

@ -1,6 +1,10 @@
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable, List, Optional, Union
from swarms.prompts.multi_agent_collab_prompt import (
MULTI_AGENT_COLLAB_PROMPT,
)
from swarms.structs.agent import Agent
from swarms.structs.agent_rearrange import AgentRearrange
from swarms.utils.loguru_logger import initialize_logger
@ -11,17 +15,24 @@ logger = initialize_logger(log_folder="sequential_workflow")
class SequentialWorkflow:
"""
A class that orchestrates the execution of a sequence of agents in a defined workflow.
Args:
name (str, optional): The name of the workflow. Defaults to "SequentialWorkflow".
description (str, optional): A description of the workflow. Defaults to "Sequential Workflow, where agents are executed in a sequence."
agents (List[Agent], optional): A list of agents that will be part of the workflow. Defaults to an empty list.
max_loops (int, optional): The maximum number of times to execute the workflow. Defaults to 1.
output_type (OutputType, optional): The format of the output from the workflow. Defaults to "dict".
shared_memory_system (callable, optional): A callable for managing shared memory between agents. Defaults to None.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Orchestrates the execution of a sequence of agents in a defined workflow.
This class enables the construction and execution of a workflow where multiple agents
(or callables) are executed in a specified order, passing tasks and optional data
through the chain. It supports both synchronous and asynchronous execution, as well as
batched and concurrent task processing.
Attributes:
id (str): Unique identifier for the workflow instance.
name (str): Human-readable name for the workflow.
description (str): Description of the workflow's purpose.
agents (List[Union[Agent, Callable]]): List of agents or callables to execute in sequence.
max_loops (int): Maximum number of times to execute the workflow.
output_type (OutputType): Format of the output from the workflow.
shared_memory_system (callable): Optional callable for managing shared memory between agents.
multi_agent_collab_prompt (bool): Whether to append a collaborative prompt to each agent.
flow (str): String representation of the agent execution order.
agent_rearrange (AgentRearrange): Internal helper for managing agent execution.
Raises:
ValueError: If the agents list is None or empty, or if max_loops is set to 0.
@ -32,13 +43,33 @@ class SequentialWorkflow:
id: str = "sequential_workflow",
name: str = "SequentialWorkflow",
description: str = "Sequential Workflow, where agents are executed in a sequence.",
agents: List[Union[Agent, Callable]] = [],
agents: List[Union[Agent, Callable]] = None,
max_loops: int = 1,
output_type: OutputType = "dict",
shared_memory_system: callable = None,
multi_agent_collab_prompt: bool = True,
team_awareness: bool = False,
*args,
**kwargs,
):
"""
Initialize a SequentialWorkflow instance.
Args:
id (str, optional): Unique identifier for the workflow. Defaults to "sequential_workflow".
name (str, optional): Name of the workflow. Defaults to "SequentialWorkflow".
description (str, optional): Description of the workflow. Defaults to a standard description.
agents (List[Union[Agent, Callable]], optional): List of agents or callables to execute in sequence.
max_loops (int, optional): Maximum number of times to execute the workflow. Defaults to 1.
output_type (OutputType, optional): Output format for the workflow. Defaults to "dict".
shared_memory_system (callable, optional): Callable for shared memory management. Defaults to None.
multi_agent_collab_prompt (bool, optional): If True, appends a collaborative prompt to each agent.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Raises:
ValueError: If the agents list is None or empty, or if max_loops is set to 0.
"""
self.id = id
self.name = name
self.description = description
@ -46,6 +77,8 @@ class SequentialWorkflow:
self.max_loops = max_loops
self.output_type = output_type
self.shared_memory_system = shared_memory_system
self.multi_agent_collab_prompt = multi_agent_collab_prompt
self.team_awareness = team_awareness
self.reliability_check()
self.flow = self.sequential_flow()
@ -57,12 +90,41 @@ class SequentialWorkflow:
flow=self.flow,
max_loops=self.max_loops,
output_type=self.output_type,
team_awareness=self.team_awareness,
*args,
**kwargs,
)
def reliability_check(self):
"""
Validates the workflow configuration and prepares agents for execution.
Raises:
ValueError: If the agents list is None or empty, or if max_loops is set to 0.
"""
if self.agents is None or len(self.agents) == 0:
raise ValueError("Agents list cannot be None or empty")
if self.max_loops == 0:
raise ValueError("max_loops cannot be 0")
if self.multi_agent_collab_prompt is True:
for agent in self.agents:
agent.system_prompt += MULTI_AGENT_COLLAB_PROMPT
logger.info(
f"Sequential Workflow Name: {self.name} is ready to run."
)
def sequential_flow(self):
# Only create flow if agents exist
"""
Constructs a string representation of the agent execution order.
Returns:
str: A string showing the order of agent execution (e.g., "AgentA -> AgentB -> AgentC").
Returns an empty string if no valid agent names are found.
"""
if self.agents:
# Create flow by joining agent names with arrows
agent_names = []
for agent in self.agents:
try:
@ -91,15 +153,6 @@ class SequentialWorkflow:
return flow
def reliability_check(self):
if self.agents is None or len(self.agents) == 0:
raise ValueError("Agents list cannot be None or empty")
if self.max_loops == 0:
raise ValueError("max_loops cannot be 0")
logger.info("Checks completed; your swarm is ready.")
def run(
self,
task: str,
@ -113,12 +166,10 @@ class SequentialWorkflow:
Args:
task (str): The task for the agents to execute.
img (Optional[str]): An optional image input for the agents.
device (str): The device to use for the agents to execute. Defaults to "cpu".
all_cores (bool): Whether to utilize all CPU cores. Defaults to False.
all_gpus (bool): Whether to utilize all available GPUs. Defaults to False.
device_id (int): The specific device ID to use for execution. Defaults to 0.
no_use_clusterops (bool): Whether to avoid using cluster operations. Defaults to True.
img (Optional[str], optional): An optional image input for the agents.
imgs (Optional[List[str]], optional): Optional list of images for the agents.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
str: The final result after processing through all agents.
@ -127,14 +178,11 @@ class SequentialWorkflow:
ValueError: If the task is None or empty.
Exception: If any error occurs during task execution.
"""
try:
# prompt = f"{MULTI_AGENT_COLLAB_PROMPT}\n\n{task}"
return self.agent_rearrange.run(
task=task,
img=img,
# imgs=imgs,
# *args,
# **kwargs,
)
except Exception as e:
@ -144,6 +192,17 @@ class SequentialWorkflow:
raise e
def __call__(self, task: str, *args, **kwargs):
"""
Allows the SequentialWorkflow instance to be called as a function.
Args:
task (str): The task for the agents to execute.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
str: The final result after processing through all agents.
"""
return self.run(task, *args, **kwargs)
def run_batched(self, tasks: List[str]) -> List[str]:
@ -157,7 +216,7 @@ class SequentialWorkflow:
List[str]: A list of final results after processing through all agents.
Raises:
ValueError: If tasks is None or empty.
ValueError: If tasks is None, empty, or contains non-string elements.
Exception: If any error occurs during task execution.
"""
if not tasks or not all(
@ -186,7 +245,7 @@ class SequentialWorkflow:
str: The final result after processing through all agents.
Raises:
ValueError: If task is None or empty.
ValueError: If task is None or not a string.
Exception: If any error occurs during task execution.
"""
if not task or not isinstance(task, str):
@ -211,7 +270,7 @@ class SequentialWorkflow:
List[str]: A list of final results after processing through all agents.
Raises:
ValueError: If tasks is None or empty.
ValueError: If tasks is None, empty, or contains non-string elements.
Exception: If any error occurs during task execution.
"""
if not tasks or not all(
@ -222,7 +281,9 @@ class SequentialWorkflow:
)
try:
with ThreadPoolExecutor() as executor:
with ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
results = [
executor.submit(self.agent_rearrange.run, task)
for task in tasks

@ -0,0 +1,5 @@
from uuid import uuid4
def swarm_id():
return f"swarm-{uuid4().hex}"

@ -8,14 +8,11 @@ from swarms.structs.conversation import Conversation
from swarms.utils.history_output_formatter import (
HistoryOutputType,
)
from swarms.structs.swarm_id import swarm_id
logger = initialize_logger(log_folder="swarm_arange")
def swarm_id():
return uuid.uuid4().hex
class SwarmRearrange:
"""
A class representing a swarm of swarms for rearranging tasks.

@ -1,3 +1,8 @@
from swarms.utils.agent_loader_markdown import (
load_agent_from_markdown,
load_agents_from_markdown,
MarkdownAgentLoader,
)
from swarms.utils.check_all_model_max_tokens import (
check_all_model_max_tokens,
)
@ -20,12 +25,6 @@ from swarms.utils.file_processing import (
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.agent_loader_markdown import (
load_agent_from_markdown,
load_agents_from_markdown,
)
from swarms.utils.litellm_tokenizer import count_tokens
from swarms.utils.output_types import HistoryOutputType
from swarms.utils.parse_code import extract_code_from_markdown
@ -52,4 +51,5 @@ __all__ = [
"load_agent_from_markdown",
"load_agents_from_markdown",
"dynamic_auto_chunking",
"MarkdownAgentLoader",
]

@ -54,7 +54,7 @@ class MarkdownAgentConfig(BaseModel):
return v
class AgentLoader:
class MarkdownAgentLoader:
"""
Loader for creating agents from markdown files using Claude Code sub-agent format.
@ -444,7 +444,7 @@ def load_agent_from_markdown(file_path: str, **kwargs) -> "Agent":
"""
# Lazy import to avoid circular dependency
loader = AgentLoader()
loader = MarkdownAgentLoader()
return loader.load_single_agent(file_path, **kwargs)
@ -488,7 +488,7 @@ def load_agents_from_markdown(
"""
# Lazy import to avoid circular dependency
loader = AgentLoader()
loader = MarkdownAgentLoader()
return loader.load_agents_from_markdown(
file_paths,
concurrent=concurrent,

Loading…
Cancel
Save