[7.1.7] [cleanup] [no clusterops] [majorityvoting fix]

pull/754/head^2
Kye Gomez 2 months ago
parent fb494267eb
commit 928e1ecf45

@ -278,8 +278,6 @@ Use logging to monitor the behavior and performance of your models. The `loguru`
```python
from loguru import logger
logger.add("file.log", rotation="10 MB")
# Log model interactions
logger.info("Running task on Anthropic model")
response = model(task)

@ -2,216 +2,216 @@
The `MajorityVoting` module provides a mechanism for performing majority voting among a group of agents. Majority voting is a decision rule that selects the option which has the majority of votes. This is particularly useful in systems where multiple agents provide responses to a query, and the most common response needs to be identified as the final output.
## Architecture
```mermaid
graph TD
A[MajorityVoting System] --> B[Initialize Agents]
B --> C[Process Task]
C --> D{Execution Mode}
D --> E[Single Task]
D --> F[Batch Tasks]
D --> G[Concurrent Tasks]
D --> H[Async Tasks]
E --> I[Run Agents]
F --> I
G --> I
H --> I
I --> J[Collect Responses]
J --> K[Consensus Analysis]
K --> L{Consensus Agent?}
L -->|Yes| M[Use Consensus Agent]
L -->|No| N[Use Last Agent]
M --> O[Final Output]
N --> O
O --> P[Save Conversation]
```
### Key Concepts
- **Majority Voting**: A method to determine the most common response from a set of answers.
- **Agents**: Entities (e.g., models, algorithms) that provide responses to tasks or queries.
- **Output Parser**: A function that processes the responses from the agents before performing the majority voting.
- **Consensus Agent**: An optional agent that analyzes the responses from all agents to determine the final consensus.
- **Conversation History**: A record of all agent interactions and responses during the voting process.
## Function Definitions
### Function: `majority_voting`
Performs majority voting on a list of answers and returns the most common answer.
## Class Definition: `MajorityVoting`
#### Parameters
### Parameters
| Parameter | Type | Description |
|-----------|----------|------------------------------|
| `answers` | `List[str]` | A list of answers from different agents. |
| Parameter | Type | Description |
|------------------|----------------|-----------------------------------------------------------------------------|
| `name` | `str` | Name of the majority voting system. Default is "MajorityVoting". |
| `description` | `str` | Description of the system. Default is "A majority voting system for agents". |
| `agents` | `List[Agent]` | A list of agents to be used in the majority voting system. |
| `output_parser` | `Callable` | Function to parse agent outputs. Default is `majority_voting` function. |
| `consensus_agent`| `Agent` | Optional agent for analyzing consensus among responses. |
| `autosave` | `bool` | Whether to autosave conversations. Default is `False`. |
| `verbose` | `bool` | Whether to enable verbose logging. Default is `False`. |
| `max_loops` | `int` | Maximum number of voting loops. Default is 1. |
#### Returns
### Methods
| Return Value | Type | Description |
|--------------|-------|----------------------------------------|
| `answer` | `str` | The most common answer in the list. If the list is empty, returns "I don't know". |
#### `run(task: str, correct_answer: str, *args, **kwargs) -> List[Any]`
## Class Definitions
Runs the majority voting system for a single task.
### Class: `MajorityVoting`
**Parameters:**
- `task` (str): The task to be performed by the agents
- `correct_answer` (str): The correct answer for evaluation
- `*args`, `**kwargs`: Additional arguments
Class representing a majority voting system for agents.
**Returns:**
- List[Any]: The conversation history as a string, including the majority vote
#### Parameters
#### `batch_run(tasks: List[str], *args, **kwargs) -> List[Any]`
| Parameter | Type | Description |
|------------------|--------------|-----------------------------------------------------------------------------|
| `agents` | `List[Agent]`| A list of agents to be used in the majority voting system. |
| `output_parser` | `Callable` | A function used to parse the output of the agents. If not provided, the default `majority_voting` function is used. |
| `autosave` | `bool` | A boolean indicating whether to autosave the conversation to a file. Default is `False`. |
| `verbose` | `bool` | A boolean indicating whether to enable verbose logging. Default is `False`. |
Runs multiple tasks in sequence.
### Method: `__init__`
**Parameters:**
- `tasks` (List[str]): List of tasks to be performed
- `*args`, `**kwargs`: Additional arguments
Initializes the `MajorityVoting` system.
**Returns:**
- List[Any]: List of majority votes for each task
#### Parameters
#### `run_concurrently(tasks: List[str], *args, **kwargs) -> List[Any]`
| Parameter | Type | Description |
|------------------|----------------|-----------------------------------------------------------------------------|
| `agents` | `List[Agent]` | A list of agents to be used in the majority voting system. |
| `output_parser` | `Callable` | A function used to parse the output of the agents. Default is the `majority_voting` function. |
| `autosave` | `bool` | A boolean indicating whether to autosave the conversation to a file. Default is `False`. |
| `verbose` | `bool` | A boolean indicating whether to enable verbose logging. Default is `False`. |
| `args` | `tuple` | Additional positional arguments. |
| `kwargs` | `dict` | Additional keyword arguments. |
Runs multiple tasks concurrently using thread pooling.
### Method: `run`
**Parameters:**
- `tasks` (List[str]): List of tasks to be performed
- `*args`, `**kwargs`: Additional arguments
Runs the majority voting system and returns the majority vote.
**Returns:**
- List[Any]: List of majority votes for each task
#### Parameters
#### `run_async(tasks: List[str], *args, **kwargs) -> List[Any]`
| Parameter | Type | Description |
|-----------|------------|------------------------------------------|
| `task` | `str` | The task to be performed by the agents. |
| `args` | `tuple` | Variable length argument list. |
| `kwargs` | `dict` | Arbitrary keyword arguments. |
Runs multiple tasks asynchronously using asyncio.
#### Returns
**Parameters:**
- `tasks` (List[str]): List of tasks to be performed
- `*args`, `**kwargs`: Additional arguments
| Return Value | Type | Description |
|--------------|-----------|--------------------------------------|
| `results` | `List[Any]` | The majority vote. |
**Returns:**
- List[Any]: List of majority votes for each task
## Usage Examples
### Example 1: Basic Majority Voting
### Example 1: Basic Single Task Execution with Modern LLMs
```python
from swarms.structs.agent import Agent
from swarms.structs.majority_voting import MajorityVoting
from swarms import Agent, MajorityVoting
# Initialize agents
# Initialize multiple agents with different specialties
agents = [
Agent(
agent_name="Devin",
system_prompt=(
"Autonomous agent that can interact with humans and other"
" agents. Be Helpful and Kind. Use the tools provided to"
" assist the user. Return all code in markdown format."
),
llm=llm,
max_loops="auto",
autosave=True,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
interactive=True,
tools=[terminal, browser, file_editor, create_file],
code_interpreter=True,
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor focused on market analysis",
system_prompt="You are a financial advisor specializing in market analysis and investment opportunities.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Codex",
system_prompt=(
"An AI coding assistant capable of writing and understanding"
" code snippets in various programming languages."
),
llm=llm,
max_loops="auto",
autosave=True,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
interactive=True,
tools=[terminal, browser, file_editor, create_file],
code_interpreter=True,
agent_name="Risk-Assessment-Agent",
agent_description="Risk analysis and portfolio management expert",
system_prompt="You are a risk assessment expert focused on evaluating investment risks and portfolio diversification.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Tabnine",
system_prompt=(
"A code completion AI that provides suggestions for code"
" completion and code improvements."
),
llm=llm,
max_loops="auto",
autosave=True,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
interactive=True,
tools=[terminal, browser, file_editor, create_file],
code_interpreter=True,
),
agent_name="Tech-Investment-Agent",
agent_description="Technology sector investment specialist",
system_prompt="You are a technology investment specialist focused on AI, emerging tech, and growth opportunities.",
max_loops=1,
model_name="gpt-4o"
)
]
# Create MajorityVoting instance
majority_voting = MajorityVoting(agents)
# Run the majority voting system
result = majority_voting.run("What is the capital of France?")
print(result) # Output: 'Paris'
consensus_agent = Agent(
agent_name="Consensus-Agent",
agent_description="Consensus agent focused on analyzing investment advice",
system_prompt="You are a consensus agent focused on analyzing investment advice and providing a final answer.",
max_loops=1,
model_name="gpt-4o"
)
# Create majority voting system
majority_voting = MajorityVoting(
name="Investment-Advisory-System",
description="Multi-agent system for investment advice",
agents=agents,
verbose=True,
consensus_agent=consensus_agent
)
# Run the analysis with majority voting
result = majority_voting.run(
task="Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
correct_answer="" # Optional evaluation metric
)
print(result)
```
### Example 2: Running a Task with Detailed Outputs
## Batch Execution
```python
from swarms.structs.agent import Agent
from swarms.structs.majority_voting import MajorityVoting
from swarms import Agent, MajorityVoting
# Initialize agents
# Initialize multiple agents with different specialties
agents = [
Agent(
agent_name="Devin",
system_prompt=(
"Autonomous agent that can interact with humans and other"
" agents. Be Helpful and Kind. Use the tools provided to"
" assist the user. Return all code in markdown format."
),
llm=llm,
max_loops="auto",
autosave=True,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
interactive=True,
tools=[terminal, browser, file_editor, create_file],
code_interpreter=True,
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor focused on market analysis",
system_prompt="You are a financial advisor specializing in market analysis and investment opportunities.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Codex",
system_prompt=(
"An AI coding assistant capable of writing and understanding"
" code snippets in various programming languages."
),
llm=llm,
max_loops="auto",
autosave=True,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
interactive=True,
tools=[terminal, browser, file_editor, create_file],
code_interpreter=True,
agent_name="Risk-Assessment-Agent",
agent_description="Risk analysis and portfolio management expert",
system_prompt="You are a risk assessment expert focused on evaluating investment risks and portfolio diversification.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Tabnine",
system_prompt=(
"A code completion AI that provides suggestions for code"
" completion and code improvements."
),
llm=llm,
max_loops="auto",
autosave=True,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
interactive=True,
tools=[terminal, browser, file_editor, create_file],
code_interpreter=True,
),
agent_name="Tech-Investment-Agent",
agent_description="Technology sector investment specialist",
system_prompt="You are a technology investment specialist focused on AI, emerging tech, and growth opportunities.",
max_loops=1,
model_name="gpt-4o"
)
]
# Create MajorityVoting instance
majority_voting = MajorityVoting(agents)
# Run the majority voting system with a different task
result = majority_voting.run("Create a new file for a plan to take over the world.")
consensus_agent = Agent(
agent_name="Consensus-Agent",
agent_description="Consensus agent focused on analyzing investment advice",
system_prompt="You are a consensus agent focused on analyzing investment advice and providing a final answer.",
max_loops=1,
model_name="gpt-4o"
)
# Create majority voting system
majority_voting = MajorityVoting(
name="Investment-Advisory-System",
description="Multi-agent system for investment advice",
agents=agents,
verbose=True,
consensus_agent=consensus_agent
)
# Run the analysis with majority voting
result = majority_voting.batch_run(
task="Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
correct_answer="" # Optional evaluation metric
)
print(result)
```

@ -1,5 +1,31 @@
# MixtureOfAgents Class Documentation
## Architecture Overview
```mermaid
graph TD
A[Input Task] --> B[Initialize MixtureOfAgents]
B --> C[Reliability Check]
C --> D[Layer 1: Parallel Agent Execution]
D --> E[Layer 2: Sequential Processing]
E --> F[Layer 3: Parallel Agent Execution]
F --> G[Final Aggregator Agent]
G --> H[Output Response]
subgraph "Agent Layer Details"
I[Agent 1] --> J[Agent Results]
K[Agent 2] --> J
L[Agent N] --> J
end
subgraph "Processing Flow"
M[Previous Context] --> N[Current Task]
N --> O[Agent Processing]
O --> P[Aggregation]
P --> M
end
```
## Overview
The `MixtureOfAgents` class represents a mixture of agents operating within a swarm. The workflow of the swarm follows a parallel → sequential → parallel → final output agent process. This implementation is inspired by concepts discussed in the paper: [https://arxiv.org/pdf/2406.04692](https://arxiv.org/pdf/2406.04692).
@ -130,6 +156,89 @@ history = moe_swarm.run(task="Solve this problem.")
print(history)
```
### `reliability_check`
```python
def reliability_check(self) -> None:
```
#### Description
Performs validation checks on the Mixture of Agents class to ensure all required components are properly configured. Raises ValueError if any checks fail.
#### Validation Checks:
- Verifies reference agents are provided
- Validates aggregator agent exists
- Checks aggregator system prompt is set
- Ensures layers count is valid (> 0)
### `_get_final_system_prompt`
```python
def _get_final_system_prompt(self, system_prompt: str, results: List[str]) -> str:
```
#### Description
Internal method that constructs a system prompt for subsequent layers by incorporating previous responses.
#### Parameters
| Parameter | Type | Description |
|-----------|------|-------------|
| `system_prompt` | `str` | The initial system prompt |
| `results` | `List[str]` | List of previous responses |
#### Returns
| Type | Description |
|------|-------------|
| `str` | Combined system prompt with previous responses |
### `run_batched`
```python
def run_batched(self, tasks: List[str]) -> List[str]:
```
#### Description
Processes multiple tasks sequentially, returning a list of responses.
#### Parameters
| Parameter | Type | Description |
|-----------|------|-------------|
| `tasks` | `List[str]` | List of tasks to process |
#### Returns
| Type | Description |
|------|-------------|
| `List[str]` | List of responses for each task |
### `run_concurrently`
```python
def run_concurrently(self, tasks: List[str]) -> List[str]:
```
#### Description
Processes multiple tasks concurrently using a ThreadPoolExecutor, optimizing for parallel execution.
#### Parameters
| Parameter | Type | Description |
|-----------|------|-------------|
| `tasks` | `List[str]` | List of tasks to process concurrently |
#### Returns
| Type | Description |
|------|-------------|
| `List[str]` | List of responses for each task |
## Detailed Explanation
### Initialization
@ -382,4 +491,113 @@ The `MixtureOfAgents` framework provides a solid foundation for further extensio
- **Advanced Agent Communication**: Enhancing the communication protocols between agents to allow for more sophisticated information exchange.
- **Integration with Other Frameworks**: Seamlessly integrating with other machine learning or data processing frameworks to leverage their capabilities within the swarm architecture.
In conclusion, the `MixtureOfAgents` class represents a versatile and efficient solution for orchestrating multi-agent systems, facilitating complex task execution through its structured and layered approach. By harnessing the power of parallel and sequential processing, it opens up new possibilities for tackling intricate problems across various domains.
In conclusion, the `MixtureOfAgents` class represents a versatile and efficient solution for orchestrating multi-agent systems, facilitating complex task execution through its structured and layered approach. By harnessing the power of parallel and sequential processing, it opens up new possibilities for tackling intricate problems across various domains.
## Additional Examples
### Example 4: Batch Processing
```python
from swarms import MixtureOfAgents, Agent
from swarm_models import OpenAIChat
# Initialize agents as in previous examples
director = Agent(
agent_name="Director",
system_prompt="Directs the tasks for the accountants",
llm=OpenAIChat(),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="director.json",
)
accountant1 = Agent(
agent_name="Accountant1",
system_prompt="Prepares financial statements",
llm=OpenAIChat(),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="accountant1.json",
)
accountant2 = Agent(
agent_name="Accountant2",
system_prompt="Audits financial records",
llm=OpenAIChat(),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="accountant2.json",
)
# Initialize MixtureOfAgents
moe_swarm = MixtureOfAgents(
agents=[director, accountant1, accountant2],
final_agent=director
)
# Process multiple tasks in batch
tasks = [
"Analyze Q1 financial statements",
"Review tax compliance",
"Prepare budget forecast"
]
results = moe_swarm.run_batched(tasks)
for task, result in zip(tasks, results):
print(f"Task: {task}\nResult: {result}\n")
```
### Example 5: Concurrent Processing
```python
from swarms import MixtureOfAgents, Agent
from swarm_models import OpenAIChat
# Initialize agents as before
# ... agent initialization code ...
# Initialize MixtureOfAgents
moe_swarm = MixtureOfAgents(
agents=[director, accountant1, accountant2],
final_agent=director
)
# Process multiple tasks concurrently
tasks = [
"Generate monthly report",
"Audit expense claims",
"Update financial projections",
"Review investment portfolio"
]
results = moe_swarm.run_concurrently(tasks)
for task, result in zip(tasks, results):
print(f"Task: {task}\nResult: {result}\n")
```
## Advanced Features
### Context Preservation
The `MixtureOfAgents` class maintains context between iterations when running multiple loops. Each subsequent iteration receives the context from previous runs, allowing for more sophisticated and context-aware processing.
### Asynchronous Processing
The class implements asynchronous processing internally using Python's `asyncio`, enabling efficient handling of concurrent operations and improved performance for complex workflows.
### Telemetry and Logging
Built-in telemetry and logging capabilities help track agent performance and maintain detailed execution records:
- Automatic logging of agent outputs
- Structured data capture using Pydantic models
- JSON-formatted output options

@ -1,124 +0,0 @@
# MultiProcessWorkflow Documentation
The `MultiProcessWorkflow` class provides a framework for executing tasks concurrently using multiple processes. This class leverages Python's `multiprocessing` module to parallelize task execution, thereby enhancing performance and efficiency. It includes features such as automatic task retry on failure and optional autosaving of results. This documentation details the class, its parameters, attributes, methods, and usage examples.
## Class Definition
### `MultiProcessWorkflow`
## Parameters
| Parameter | Type | Default | Description |
|---------------|---------------------|---------|---------------------------------------------------------------|
| `max_workers` | `int` | `5` | The maximum number of workers to use for parallel processing. |
| `autosave` | `bool` | `True` | Flag indicating whether to automatically save the workflow. |
| `agents` | `Sequence[Agent]` | `None` | A list of Agent objects representing the workflow agents. |
| `*args` | `tuple` | | Additional positional arguments. |
| `**kwargs` | `dict` | | Additional keyword arguments. |
## Attributes
| Attribute | Type | Description |
|-----------------|---------------------|--------------------------------------------------------------|
| `max_workers` | `int` | The maximum number of workers to use for parallel processing.|
| `autosave` | `bool` | Flag indicating whether to automatically save the workflow. |
| `agents` | `Sequence[Agent]` | A list of Agent objects representing the workflow agents. |
## Methods
### `execute_task`
#### Description
The `execute_task` method executes a given task and handles any exceptions that may occur during execution. If agents are defined, it will execute the task using each agent in sequence.
#### Usage Example
```python
# Define a task
task = Task()
# Execute the task
workflow = MultiProcessWorkflow()
result = workflow.execute_task(task)
print(result)
```
### `run`
#### Description
The `run` method executes the workflow by running the given task using multiple processes. It manages the task execution using a process pool and collects the results.
#### Usage Example
```python
from swarms.structs.multi_process_workflow import MultiProcessingWorkflow
from swarms.structs.task import Task
from datetime import datetime
from time import sleep
# Define a simple task
def simple_task():
sleep(1)
return datetime.now()
# Create a task object
task = Task(
name="Simple Task",
execute=simple_task,
priority=1,
)
# Create a workflow with the task
workflow = MultiProcessWorkflow(max_workers=3, autosave=True, agents=[agent1, agent2])
# Run the workflow
results = workflow.run(task)
# Print the results
print(results)
```
## Detailed Functionality and Usage
### Initialization
When an instance of `MultiProcessWorkflow` is created, it initializes the following:
- **max_workers**: Sets the maximum number of processes that can run concurrently.
- **autosave**: Determines if the workflow results should be saved automatically.
- **agents**: Accepts a list of agents that will perform the tasks.
### Running Tasks
The `run` method performs the following steps:
1. **Initialize Results and Manager**: Creates a list to store results and a `Manager` to manage shared state between processes.
2. **Initialize Process Pool**: Creates a pool of worker processes.
3. **Submit Tasks**: Iterates over the agents, submitting tasks to the pool for execution and collecting the results.
4. **Wait for Completion**: Waits for all tasks to complete and collects the results.
5. **Return Results**: Returns the list of results from all executed tasks.
### Autosave Task Result
Although the autosave functionality is mentioned in the parameters, it is not explicitly defined in the given code. The implementation for autosaving should be added based on the specific requirements of the application.
## Additional Information and Tips
- **Process Safety**: The use of `Manager` ensures that the list of results is managed safely across multiple processes.
- **Logging**: The class uses the `logger` module to log information about task execution, retries, and failures.
- **Error Handling**: The retry mechanism in the `execute_task` method helps in handling transient errors by attempting to re-execute failed tasks.
## References and Resources
For more information on multiprocessing in Python, refer to the following resources:
- [Python Multiprocessing Documentation](https://docs.python.org/3/library/multiprocessing.html)
- [Python Logging Documentation](https://docs.python.org/3/library/logging.html)
---
By following this detailed documentation, users can effectively understand and utilize the `MultiProcessWorkflow` class to execute tasks concurrently with multiple processes. The examples provided help in demonstrating the practical usage of the class.

@ -1,204 +0,0 @@
# MultiProcessWorkflow Documentation
The `MultiProcessWorkflow` class extends the `BaseWorkflow` to support parallel processing using multiple workers. This class is designed to efficiently execute tasks concurrently, leveraging the power of multi-processing to enhance performance and scalability.
### Key Concepts
- **Parallel Processing**: Utilizing multiple workers to execute tasks concurrently.
- **Workflow Management**: Handling the execution of tasks in a structured workflow.
- **Agents**: Entities responsible for executing tasks.
## Attributes
### Arguments
| Argument | Type | Default | Description |
|--------------|---------------------|---------|-------------|
| `max_workers`| `int` | `5` | The maximum number of workers to use for parallel processing. |
| `autosave` | `bool` | `True` | Flag indicating whether to automatically save the workflow. |
| `agents` | `Sequence[Agent]` | `None` | A list of agents participating in the workflow. |
| `*args` | | | Additional positional arguments. |
| `**kwargs` | | | Additional keyword arguments. |
### Attributes
| Attribute | Type | Description |
|--------------|---------------------|-------------|
| `max_workers`| `int` | The maximum number of workers to use for parallel processing. |
| `autosave` | `bool` | Flag indicating whether to automatically save the workflow. |
| `agents` | `Sequence[Agent]` | A list of agents participating in the workflow. |
## Methods
### __init__
Initializes the `MultiProcessWorkflow` with the given parameters.
**Examples:**
```python
from swarms.structs.agent import Agent
from swarms.structs.task import Task
from swarms.structs.multi_process_workflow import MultiProcessWorkflow
agents = [Agent(name="Agent 1"), Agent(name="Agent 2")]
tasks = [Task(name="Task 1", execute=lambda: "result1"), Task(name="Task 2", execute=lambda: "result2")]
workflow = MultiProcessWorkflow(max_workers=3, agents=agents, tasks=tasks)
```
### execute_task
Executes a task and handles exceptions.
**Arguments:**
| Parameter | Type | Description |
|-----------|------|-------------|
| `task` | `str` | The task to execute. |
| `*args` | | Additional positional arguments for the task execution. |
| `**kwargs`| | Additional keyword arguments for the task execution. |
**Returns:**
| Return Type | Description |
|-------------|-------------|
| `Any` | The result of the task execution. |
**Examples:**
```python
result = workflow.execute_task(task="Sample Task")
print(result)
```
### run
Runs the workflow.
**Arguments:**
| Parameter | Type | Description |
|-----------|------|-------------|
| `task` | `str` | The task to run. |
| `*args` | | Additional positional arguments for the task execution. |
| `**kwargs`| | Additional keyword arguments for the task execution. |
**Returns:**
| Return Type | Description |
|-------------|-------------|
| `List[Any]` | The results of all executed tasks. |
**Examples:**
```python
results = workflow.run(task="Sample Task")
print(results)
```
### Additional Examples
#### Example 1: Simple Task Execution
```python
from swarms import Agent, Task, MultiProcessWorkflow, OpenAIChat
from datetime import datetime
from time import sleep
import os
from dotenv import load_dotenv
# Load the environment variables
load_dotenv()
# Define a function to be used as the action
def my_action():
print("Action executed")
# Define a function to be used as the condition
def my_condition():
print("Condition checked")
return True
# Create an agent
agent = Agent(
llm=OpenAIChat(openai_api_key=os.environ["OPENAI_API_KEY"]),
max_loops=1,
dashboard=False,
)
# Create a task
task = Task(
description=(
"Generate a report on the top 3 biggest expenses for small"
" businesses and how businesses can save 20%"
),
agent=agent,
)
# Create a workflow with the task
workflow = MultiProcessWorkflow(tasks=[task])
# Run the workflow
results = workflow.run(task)
print(results)
```
#### Example 2: Workflow with Multiple Agents
```python
from swarms import Agent, Task, MultiProcessWorkflow
# Define tasks
def task1():
return "Task 1 result"
def task2():
return "Task 2 result"
# Create agents
agent1 = Agent(name="Agent 1", llm=OpenAIChat())
agent2 = Agent(name="Agent 2", llm=OpenAIChat())
# Create tasks
task_1 = Task(name="Task 1", execute=task1)
task_2 = Task(name="Task 2", execute=task2)
# Create a workflow
workflow = MultiProcessWorkflow(agents=[agent1, agent2], tasks=[task_1, task_2])
# Run the workflow
results = workflow.run(task="Example Task")
print(results)
```
#### Example 3: Customizing Max Workers
```python
from swarms import Agent, Task, MultiProcessWorkflow, OpenAIChat
# Define a task
def example_task():
return "Task result"
# Create an agent
agent = Agent(name="Agent 1", llm=OpenAIChat())
# Create a task
task = Task(name="Example Task", execute=example_task)
# Create a workflow with custom max workers
workflow = MultiProcessWorkflow(max_workers=10, agents=[agent], tasks=[task])
# Run the workflow
results = workflow.run(task="Example Task")
print(results)
```
## Summary
The `MultiProcessWorkflow` class provides a powerful framework for managing and executing tasks using multiple workers. With support for parallel processing, customizable workflows, and detailed logging, it is an ideal tool for complex task execution scenarios. This class enhances performance and scalability, making it suitable for a wide range of applications that require efficient task management.

@ -28,5 +28,4 @@ agent = Agent(
agent.run(
"Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
all_cores=True,
)

@ -0,0 +1,388 @@
import os
from dataclasses import dataclass
from typing import Tuple
from litellm import completion
from loguru import logger
from swarms import Agent
EXTRACTION_PROMPT = """
You are a specialized Chart2Table extraction agent that converts visual charts into precise textual descriptions.
Output Format:
[Chart Type]
Type: {bar|line|pie|scatter|combination}
Title: {chart title}
X-Axis: {label and scale}
Y-Axis: {label and scale}
[Data Series]
Name: {series name}
Values: {comma-separated list of values}
{repeat for each series}
[Annotations]
- {list any markers, gridlines, legends}
- {note any data gaps or anomalies}
Guidelines:
1. Maintain exact numerical precision
2. List ALL data points in order
3. Note any gaps, outliers or special patterns
4. Describe axes scales (linear/log) and units
5. Include legends and series names verbatim
6. Note any data point annotations or markers
7. Describe chart elements spatially (top-left, center, etc)
8. Include color and style information if relevant
9. Note relationships between multiple series
10. Flag any data quality or readability issues"""
REFORMULATION_PROMPT = """You are an Answer Reformulation specialist that breaks down complex analytical statements into atomic, verifiable claims.
Output Format:
[Core Claims]
1. {single fact with exact numbers}
2. {another atomic fact}
{continue for all core claims}
[Supporting Context]
1. {relevant context that supports core claims}
2. {additional contextual information}
{continue for all context}
[Assumptions]
1. {implicit assumption made}
2. {another assumption}
{continue for all assumptions}
Guidelines:
1. Each claim must be independently verifiable
2. Use exact numbers, never round or approximate
3. Split compound statements into atomic facts
4. Make implicit comparisons explicit
5. Note temporal relationships clearly
6. Include units with all measurements
7. Flag any uncertainty or approximations
8. Note data source limitations
9. Preserve calculation steps
10. Maintain logical dependencies"""
CAPTIONING_PROMPT = """You are an Entity Captioning specialist that generates rich contextual descriptions of chart elements.
Output Format:
[Data Points]
{x,y}: {detailed description of point significance}
{continue for key points}
[Trends]
- {description of overall pattern}
- {notable sub-patterns}
{continue for all trends}
[Relationships]
- {correlation between variables}
- {causation if evident}
{continue for all relationships}
[Context]
- {broader context for interpretation}
- {relevant external factors}
{continue for all context}
Guidelines:
1. Describe both local and global patterns
2. Note statistical significance of changes
3. Identify cyclic or seasonal patterns
4. Flag outliers and anomalies
5. Compare relative magnitudes
6. Note rate of change patterns
7. Describe distribution characteristics
8. Highlight key inflection points
9. Note data clustering patterns
10. Include domain-specific insights"""
PREFILTER_PROMPT = """You are a Pre-filtering specialist that identifies relevant chart elements for verification.
Output Format:
[Critical Elements]
1. {element}: Score {0-10}
Evidence: {why this supports claims}
{continue for all relevant elements}
[Supporting Elements]
1. {element}: Score {0-10}
Context: {how this adds context}
{continue for all supporting elements}
[Relevance Chain]
1. {claim} -> {element} -> {evidence}
{continue for all connections}
Guidelines:
1. Score relevance 0-10 with detailed rationale
2. Build explicit evidence chains
3. Note both direct and indirect support
4. Consider temporal relevance
5. Account for data relationships
6. Note confidence levels
7. Include contextual importance
8. Consider alternative interpretations
9. Note missing evidence
10. Explain filtering decisions"""
RERANK_PROMPT = """You are a Re-ranking specialist that orders evidence by strength and relevance.
Output Format:
[Primary Evidence]
1. {element} - Score: {0-10}
Strength: {detailed justification}
{continue for top evidence}
[Supporting Evidence]
1. {element} - Score: {0-10}
Context: {how this reinforces primary evidence}
{continue for supporting evidence}
[Evidence Chains]
1. {claim} -> {primary} -> {supporting} -> {conclusion}
{continue for all chains}
Guidelines:
1. Use explicit scoring criteria
2. Consider evidence independence
3. Note corroborating elements
4. Account for evidence quality
5. Consider contradictory evidence
6. Note confidence levels
7. Explain ranking decisions
8. Build complete evidence chains
9. Note gaps in evidence
10. Consider alternative interpretations"""
LOCALIZATION_PROMPT = """You are a Cell Localization specialist that precisely maps data to visual elements.
Output Format:
[Element Locations]
1. Type: {bar|line|point|label}
Position: {x1,y1,x2,y2}
Value: {associated data value}
Confidence: {0-10}
{continue for all elements}
[Spatial Relationships]
- {relative positions}
- {alignment patterns}
{continue for all relationships}
[Visual Context]
- {surrounding elements}
- {reference points}
{continue for context}
Guidelines:
1. Use normalized coordinates (0-1)
2. Note element boundaries precisely
3. Include confidence scores
4. Note spatial relationships
5. Account for overlapping elements
6. Consider chart type constraints
7. Note alignment patterns
8. Include reference points
9. Note visual hierarchies
10. Document occlusions"""
@dataclass
class ChartElement:
element_type: str
bbox: Tuple[float, float, float, float]
confidence: float
class VisionAPI:
def __init__(
self,
model_name: str = "gpt-4o",
max_tokens: int = 1000,
temperature: float = 0.5,
):
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
def encode_image(self, img: str):
if img.startswith("http"):
return img
import base64
with open(img, "rb") as image_file:
encoded_string = base64.b64encode(
image_file.read()
).decode("utf-8")
return f"data:image/png;base64,{encoded_string}"
def run(self, task: str, img: str):
img = self.encode_image(img)
response = completion(
model=self.model_name,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": task},
{
"type": "image_url",
"image_url": {"url": img},
},
],
}
],
max_tokens=self.max_tokens,
temperature=self.temperature,
)
return response.choices[0].message.content
class ChartCitor:
def __init__(
self,
model_name: str = "gpt-4o",
saved_state_path: str = "chartcitor_state.json",
max_retries: int = 3,
max_loops: int = 1,
):
logger.info(
f"Initializing ChartCitor with model {model_name}"
)
model = VisionAPI()
self.extraction_agent = Agent(
agent_name="Chart2Table-Agent",
system_prompt=EXTRACTION_PROMPT,
llm=model,
max_loops=1,
)
self.reformulation_agent = Agent(
agent_name="Answer-Reformulation-Agent",
system_prompt=REFORMULATION_PROMPT,
llm=model,
max_loops=1,
)
self.captioning_agent = Agent(
agent_name="Entity-Captioning-Agent",
system_prompt=CAPTIONING_PROMPT,
llm=model,
max_loops=1,
)
self.prefilter_agent = Agent(
agent_name="LLM-Prefilter-Agent",
system_prompt=PREFILTER_PROMPT,
llm=model,
max_loops=1,
)
self.rerank_agent = Agent(
agent_name="LLM-Rerank-Agent",
system_prompt=RERANK_PROMPT,
llm=model,
max_loops=1,
)
self.localization_agent = Agent(
agent_name="Cell-Localization-Agent",
system_prompt=LOCALIZATION_PROMPT,
llm=model,
max_loops=1,
)
def extract_table(self, chart_image: str) -> str:
logger.info("Extracting table from chart")
return self.extraction_agent.run(
"Extract and describe the data from this chart following the specified format.",
img=chart_image,
)
def reformulate_answer(
self, answer: str, table_data: str, chart_image: str
) -> str:
logger.info("Reformulating answer into atomic facts")
return self.reformulation_agent.run(
f"Break this answer into atomic facts:\n{answer}\n\nTable data:\n{table_data}",
img=chart_image,
)
def generate_captions(
self, table_data: str, chart_image: str
) -> str:
logger.info("Generating captions for chart elements")
return self.captioning_agent.run(
f"Generate descriptive captions for this data:\n{table_data}",
img=chart_image,
)
def retrieve_evidence(
self,
facts: str,
table_data: str,
captions: str,
chart_image: str,
) -> str:
logger.info("Retrieving supporting evidence")
filtered = self.prefilter_agent.run(
f"Identify relevant elements for:\nFacts:\n{facts}\n\nData:\n{table_data}\n\nCaptions:\n{captions}",
img=chart_image,
)
return self.rerank_agent.run(
f"Rank these elements by relevance:\n{filtered}\nFor facts:\n{facts}",
img=chart_image,
)
def localize_elements(
self, chart_image: str, evidence: str
) -> str:
logger.info("Localizing chart elements")
return self.localization_agent.run(
f"Describe the location of these elements:\n{evidence}",
img=chart_image,
)
def run(
self, chart_image: str, question: str, answer: str
) -> str:
logger.info(f"Processing question: {question}")
table_data = self.extract_table(chart_image)
facts = self.reformulate_answer(
answer, table_data, chart_image
)
captions = self.generate_captions(table_data, chart_image)
evidence = self.retrieve_evidence(
facts, table_data, captions, chart_image
)
citations = self.localize_elements(chart_image, evidence)
return f"""Analysis Results:
Facts:
{facts}
Evidence:
{evidence}
Visual Citations:
{citations}
"""
if __name__ == "__main__":
chartcitor = ChartCitor()
result = chartcitor.run(
chart_image="chart.png",
question="Analyze this chart of solana price and volume over time. What is the highest volume day?",
answer="203",
)
print(result)

@ -0,0 +1,603 @@
import concurrent.futures
import json
import os
import time
import uuid
from io import BytesIO
from typing import Dict, List, Union
import PyPDF2
from pydantic import BaseModel, Field
from reportlab.lib.pagesizes import LETTER
from reportlab.pdfgen import canvas
from swarms import Agent
def user_id_generator():
return str(uuid.uuid4().hex)
timestamp = time.strftime("%Y%m%d_%H%M%S")
# print(timestamp)
class MortgageApplicationInput(BaseModel):
user_id: str = Field(default_factory=user_id_generator)
timestamp: str = Field(default_factory=timestamp)
application_data: str = Field(
description="The raw text of the mortgage application."
)
class MortgageApplicationOutput(BaseModel):
user_id: str = Field(default_factory=user_id_generator)
input_data: MortgageApplicationInput = Field(
description="The input data for the mortgage application."
)
document_analysis: str = Field(
description="The structured analysis of the mortgage application."
)
risk_evaluation: str = Field(
description="The risk evaluation of the mortgage application."
)
underwriting_decision: str = Field(
description="The underwriting decision of the mortgage application."
)
def clean_markdown(text: str) -> str:
"""
Removes all markdown symbols from text.
Args:
text (str): Text containing markdown symbols
Returns:
str: Text with markdown symbols removed
"""
markdown_symbols = [
"```markdown",
"```",
"#",
"*",
"_",
"`",
">",
"-",
"+",
"[",
"]",
"(",
")",
"|",
]
cleaned_text = text
for symbol in markdown_symbols:
cleaned_text = cleaned_text.replace(symbol, "")
return cleaned_text.strip()
class MortgageUnderwritingSwarm:
def __init__(
self,
user_id: str = user_id_generator(),
save_directory: str = "./autosave",
return_format: str = "pdf",
):
"""
Initialize the MortgageUnderwritingSwarm with the necessary Agents.
Args:
save_directory (str): Directory where intermediate results and final documents will be autosaved.
"""
self.user_id = user_id
self.save_directory = save_directory
self.return_format = return_format
os.makedirs(self.save_directory, exist_ok=True)
# -------------------------------
# 1) Document Analyzer Agent
# -------------------------------
self.document_agent = Agent(
agent_name="Document-Analyzer-Agent",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
self.document_prompt = """
You are a highly experienced Mortgage Document Analysis Expert with deep knowledge of federal and state mortgage regulations. Your task is to:
1. Parse and extract key data from unstructured documents (PDF or text) while ensuring compliance with:
- Truth in Lending Act (TILA) requirements
- Real Estate Settlement Procedures Act (RESPA) guidelines
- Fair Credit Reporting Act (FCRA) standards
- Equal Credit Opportunity Act (ECOA) requirements
2. Validate data consistency and regulatory compliance for:
- Income verification (including all sources of income)
- Credit scores and credit history
- Property details and appraisal information
- Debt obligations and payment history
- Employment verification
- Asset documentation
- Identity verification documents
3. Highlight any discrepancies, red flags, or potential compliance violations, including:
- Inconsistencies in reported income vs documentation
- Suspicious patterns in bank statements
- Potential identity theft indicators
- Missing required regulatory disclosures
- Fair lending concerns
- Anti-money laundering (AML) red flags
4. Provide a comprehensive, well-structured summary that includes:
- All key findings organized by category
- Compliance checklist results
- Documentation completeness assessment
- Regulatory disclosure verification
- Quality control notes
5. Clearly indicate any missing or ambiguous information required by:
- Federal regulations
- State-specific requirements
- Agency guidelines (FHA, VA, Fannie Mae, Freddie Mac)
- Internal compliance policies
6. Format output in a standardized structure that:
- Facilitates automated compliance checks
- Enables clear audit trails
- Supports regulatory reporting requirements
- Can be easily consumed by subsequent agents
"""
# -------------------------------
# 2) Risk Evaluator Agent
# -------------------------------
self.risk_agent = Agent(
agent_name="Risk-Evaluator-Agent",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
self.risk_prompt = """
You are an expert Risk Evaluator for mortgage applications with comprehensive knowledge of regulatory compliance. Your responsibilities:
1. Conduct thorough risk assessment in accordance with:
- Dodd-Frank Act requirements
- Consumer Financial Protection Bureau (CFPB) guidelines
- Federal Reserve Board regulations
- Agency-specific requirements (FHA, VA, Fannie Mae, Freddie Mac)
2. Evaluate key risk factors including:
- Debt-to-income ratio (DTI) compliance with QM rules
- Credit history analysis per FCRA guidelines
- Property valuation in line with USPAP standards
- Income stability and verification per agency requirements
- Assets and reserves adequacy
- Employment history and verification
- Occupancy risk assessment
- Property type and use restrictions
3. Calculate and assign risk scores:
- Overall application risk score (1-10 scale)
- Individual component risk scores
- Regulatory compliance risk assessment
- Fraud risk indicators
- Default risk probability
4. Identify and document:
- High-risk elements requiring additional scrutiny
- Potential regulatory compliance issues
- Required compensating factors
- Secondary market eligibility concerns
- Fair lending considerations
5. Recommend risk mitigation strategies:
- Additional documentation requirements
- Income/asset verification needs
- Compensating factor documentation
- Alternative qualification approaches
- Regulatory compliance remediation steps
6. Generate comprehensive risk analysis including:
- Detailed risk assessment findings
- Compliance verification results
- Supporting documentation requirements
- Clear justification for all conclusions
- Regulatory requirement adherence confirmation
"""
# -------------------------------
# 3) Mortgage Underwriter Agent
# -------------------------------
self.underwriter_agent = Agent(
agent_name="Mortgage-Underwriter-Agent",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
self.underwriter_prompt = """
You are a seasoned Mortgage Underwriter with expertise in regulatory compliance and industry standards. Your role is to:
1. Make final underwriting decisions while ensuring compliance with:
- Qualified Mortgage (QM) and Ability-to-Repay (ATR) rules
- Fair lending laws (ECOA, FHA, HMDA)
- Agency guidelines (FHA, VA, Fannie Mae, Freddie Mac)
- State-specific lending requirements
- Internal credit policies and procedures
2. Review and synthesize:
- Document Analyzer findings
- Risk Evaluator assessments
- Compliance verification results
- Quality control checks
- Regulatory requirements
- Secondary market guidelines
3. Determine appropriate decision category:
- Approved
- Conditionally Approved (with specific conditions)
- Denied (with detailed adverse action notice requirements)
- Counteroffer recommendations
- Alternative program suggestions
4. For all decisions, provide:
- Clear written justification
- Regulatory compliance confirmation
- Required disclosures identification
- Adverse action notices if required
- Fair lending analysis documentation
- Secondary market eligibility determination
5. For conditional approvals, specify:
- Required documentation
- Timeline requirements
- Regulatory compliance conditions
- Prior-to-funding conditions
- Post-closing requirements
- Quality control conditions
6. Generate comprehensive decision report including:
- Detailed underwriting analysis
- Compliance verification results
- Supporting documentation list
- Condition status tracking
- Regulatory requirement satisfaction
- Clear audit trail documentation
7. Ensure all decisions adhere to:
- Fair lending requirements
- Anti-discrimination laws
- UDAAP regulations
- State and federal disclosure requirements
- Agency and investor guidelines
- Internal policies and procedures
"""
# --------------------------------------------------------------------------
# Utility Methods
# --------------------------------------------------------------------------
def pdf_to_text(self, pdf_file_path: str) -> str:
"""
Converts a PDF file to a string by extracting its text content.
Args:
pdf_file_path (str): The path to the PDF file.
Returns:
str: The extracted text from the PDF.
"""
text_content = []
with open(pdf_file_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
page_text = page.extract_text() or ""
text_content.append(page_text)
return "\n".join(text_content)
def autosave_result(
self, result_data: str, filename: str
) -> None:
"""
Autosaves intermediate or final results to a text file in the designated directory.
Args:
result_data (str): The data to be written to the file.
filename (str): The desired filename (without path).
"""
full_path = os.path.join(self.save_directory, filename)
with open(full_path, "w", encoding="utf-8") as file:
file.write(result_data)
def generate_pdf_report(
self, content: str, pdf_path: str
) -> None:
"""
Generates a simple PDF report from text content using ReportLab.
Args:
content (str): The textual content for the PDF.
pdf_path (str): Where to save the generated PDF.
"""
BytesIO()
c = canvas.Canvas(pdf_path, pagesize=LETTER)
width, height = LETTER
# Simple text wrap by splitting lines
lines = clean_markdown(content).split("\n")
current_height = height - 50 # top margin
for line in lines:
# If the line is too long, wrap it manually (simple approach)
max_chars = 90 # approx number of characters per line for LETTER size
while len(line) > max_chars:
c.drawString(50, current_height, line[:max_chars])
line = line[max_chars:]
current_height -= 15 # line spacing
c.drawString(50, current_height, line)
current_height -= 15
# Add a new page if we go beyond the margin
if current_height <= 50:
c.showPage()
current_height = height - 50
c.save()
# --------------------------------------------------------------------------
# Core Processing Methods
# --------------------------------------------------------------------------
def analyze_documents(self, document_data: str) -> str:
"""
Runs the Document Analyzer Agent on the given data.
Args:
document_data (str): Text representing the mortgage documents.
Returns:
str: Structured summary and highlights from the document analysis.
"""
prompt_input = (
self.document_prompt
+ "\n\n--- BEGIN DOCUMENTS ---\n"
+ document_data
+ "\n--- END DOCUMENTS ---\n"
)
print("Running Document Analyzer Agent...")
result = self.document_agent.run(prompt_input)
self.autosave_result(result, "document_analysis.txt")
return result
def evaluate_risk(self, document_analysis: str) -> str:
"""
Runs the Risk Evaluator Agent using the results from the Document Analyzer.
Args:
document_analysis (str): The structured analysis from the Document Analyzer.
Returns:
str: Risk analysis including risk score and explanation.
"""
prompt_input = (
self.risk_prompt
+ "\n\n--- DOCUMENT ANALYSIS OUTPUT ---\n"
+ document_analysis
+ "\n--- END ANALYSIS OUTPUT ---\n"
)
print("Running Risk Evaluator Agent...")
result = self.risk_agent.run(prompt_input)
self.autosave_result(result, "risk_evaluation.txt")
return result
def underwrite_mortgage(
self, document_analysis: str, risk_evaluation: str
) -> str:
"""
Runs the Mortgage Underwriter Agent to produce the final underwriting decision.
Args:
document_analysis (str): Output from the Document Analyzer.
risk_evaluation (str): Output from the Risk Evaluator.
Returns:
str: Final decision text with rationale.
"""
prompt_input = (
self.underwriter_prompt
+ "\n\n--- DOCUMENT ANALYSIS SUMMARY ---\n"
+ document_analysis
+ "\n--- RISK EVALUATION REPORT ---\n"
+ risk_evaluation
+ "\n--- END REPORTS ---\n"
)
print("Running Mortgage Underwriter Agent...")
result = self.underwriter_agent.run(prompt_input)
self.autosave_result(result, "underwriting_decision.txt")
return result
# --------------------------------------------------------------------------
# High-Level Workflow
# --------------------------------------------------------------------------
def run(
self,
application_data: str,
return_format: str = "pdf",
output_filename: str = "UnderwritingDecision",
) -> Union[str, Dict]:
"""
Processes a single mortgage application from documents to final underwriting decision.
Allows returning data in either PDF or JSON format.
Args:
application_data (str): The text representation of the applicants documents.
return_format (str): "pdf" or "json". Defaults to "pdf".
output_filename (str): Base filename (without extension) for the output file.
Returns:
Union[str, Dict]: If return_format="json", returns a dict with the final data.
If return_format="pdf", returns the path of the generated PDF.
"""
# Step 1: Document Analysis
doc_analysis = self.analyze_documents(application_data)
# Step 2: Risk Evaluation
risk_eval = self.evaluate_risk(doc_analysis)
# Step 3: Underwriting Decision
final_decision = self.underwrite_mortgage(
doc_analysis, risk_eval
)
# Prepare final content (text)
final_content = (
"---- Mortgage Underwriting Decision Report ----\n\n"
"DOCUMENT ANALYSIS:\n" + doc_analysis + "\n\n"
"RISK EVALUATION:\n" + risk_eval + "\n\n"
"FINAL UNDERWRITING DECISION:\n" + final_decision + "\n"
)
# Return JSON
if return_format.lower() == "json":
output_data = {
"document_analysis": doc_analysis,
"risk_evaluation": risk_eval,
"final_decision": final_decision,
}
json_path = os.path.join(
self.save_directory, f"{output_filename}.json"
)
with open(json_path, "w", encoding="utf-8") as jf:
json.dump(output_data, jf, indent=2)
return output_data
# Generate PDF
elif return_format.lower() == "pdf":
pdf_path = os.path.join(
self.save_directory, f"{output_filename}.pdf"
)
self.generate_pdf_report(final_content, pdf_path)
return pdf_path
else:
raise ValueError(
"Invalid return format. Choose either 'pdf' or 'json'."
)
def run_concurrently(
self,
application_data: str,
return_format: str = "pdf",
output_filename: str = "UnderwritingDecision",
) -> Union[str, Dict]:
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
futures = [
executor.submit(
self.run,
application_data,
return_format,
output_filename,
)
]
results = [
future.result()
for future in concurrent.futures.as_completed(futures)
]
return results
# --------------------------------------------------------------------------
# Batch Processing
# --------------------------------------------------------------------------
def runs_in_batch(
self,
list_of_application_data: List[str],
return_format: str = "pdf",
) -> List[Union[str, Dict]]:
"""
Processes multiple mortgage applications in a batch and returns the results as
either PDFs or JSON structures for each application.
Args:
list_of_application_data (List[str]): A list of string representations
of mortgage applications (e.g., raw text).
return_format (str): "pdf" or "json" format for the output files.
Returns:
List[Union[str, Dict]]: A list of outputs (either file paths to PDFs or JSON dicts).
"""
results = []
for idx, application_text in enumerate(
list_of_application_data, start=1
):
output_filename = f"UnderwritingDecision_{idx}"
print(f"\n--- Processing Application {idx} ---")
result = self.run(
application_data=application_text,
return_format=return_format,
output_filename=output_filename,
)
results.append(result)
return results
# --------------------------------------------------------------------------
# PDF/Document Conversion Helpers
# --------------------------------------------------------------------------
def convert_pdfs_to_texts(
self, pdf_paths: List[str]
) -> List[str]:
"""
Converts multiple PDFs into text.
Args:
pdf_paths (List[str]): A list of file paths to PDF documents.
Returns:
List[str]: A list of extracted text contents, one per PDF in the list.
"""
text_results = []
for pdf_path in pdf_paths:
print(f"Converting PDF to text: {pdf_path}")
text_data = self.pdf_to_text(pdf_path)
text_results.append(text_data)
return text_results
# ------------------------------------------------------------------------------
# Example Usage (As a Script)
# ------------------------------------------------------------------------------
if __name__ == "__main__":
# Sample mortgage application text (or read from PDF, DB, etc.)
sample_application_data = """
Mortgage Application Data:
Applicant Name: Jane Doe
DOB: 02/14/1985
SSN: 987-65-4321
Annual Income: $95,000
Credit Score: 690
Outstanding Debt: $40,000
Property Appraisal: $300,000
Loan Amount Request: $270,000
Employment: 3+ years at current employer
Bank Statements & Tax Returns: Provided for the last year
Extra Notes: Some minor late payments on credit cards in 2020.
"""
# Initialize the swarm
swarm = MortgageUnderwritingSwarm(
save_directory="./autosave_results"
)
# 1) Convert PDF to text if needed
# pdf_text = swarm.pdf_to_text("path_to_some_pdf.pdf")
# Or convert multiple PDFs in batch
# texts_from_pdfs = swarm.convert_pdfs_to_texts(["file1.pdf", "file2.pdf"])
# 2) Process a single application
final_pdf_path = swarm.run(
application_data=sample_application_data,
return_format="pdf", # or "json"
output_filename="JaneDoe_UnderwritingDecision",
)
print(f"PDF generated at: {final_pdf_path}")
# 3) Process multiple applications in a batch
# multiple_apps = [sample_application_data, sample_application_data] # Pretend we have 2
# batch_results = swarm.runs_in_batch(
# multiple_apps,
# return_format="json"
# )
# Each item in batch_results will be a JSON dict if return_format="json".
# print("\nBatch Processing Results (JSON):")
# for result in batch_results:
# print(json.dumps(result, indent=2))

@ -0,0 +1,156 @@
import os
from swarms import Agent
from dotenv import load_dotenv
from swarm_models import OpenAIChat
load_dotenv()
model = OpenAIChat(
model_name="deepseek-ai/DeepSeek-R1-Distill-Llama-70B-free",
openai_api_key=os.getenv("TOGETHER_API_KEY"),
base_url="https://api.together.xyz/v1",
)
# Define system prompts for reasoning agents
THINKING_AGENT_PROMPT = """You are a sophisticated analytical and strategic thinking agent focused on deep problem analysis and solution design.
Your core capabilities include:
1. Comprehensive Problem Analysis
- Break down complex problems into constituent elements
- Map relationships and dependencies between components
- Identify root causes and underlying patterns
- Consider historical context and precedents
2. Multi-Perspective Evaluation
- Examine issues from multiple stakeholder viewpoints
- Consider short-term and long-term implications
- Evaluate social, economic, technical, and ethical dimensions
- Challenge assumptions and identify potential biases
3. Risk Assessment and Mitigation
- Conduct thorough risk analysis across scenarios
- Identify potential failure modes and edge cases
- Develop contingency plans and mitigation strategies
- Assess probability and impact of various outcomes
4. Strategic Solution Development
- Generate multiple solution approaches
- Evaluate trade-offs between different strategies
- Consider resource constraints and limitations
- Design scalable and sustainable solutions
5. Decision Framework Creation
- Establish clear evaluation criteria
- Weight competing priorities appropriately
- Create structured decision matrices
- Document reasoning and key decision factors
6. Systems Thinking
- Map interconnections between system elements
- Identify feedback loops and cascade effects
- Consider emergent properties and behaviors
- Account for dynamic system evolution
Your output should always include:
- Clear articulation of your analytical process
- Key assumptions and their justification
- Potential risks and mitigation strategies
- Multiple solution options with pros/cons
- Specific recommendations with supporting rationale
- Areas of uncertainty requiring further investigation
Focus on developing robust, well-reasoned strategies that account for complexity while remaining practical and actionable."""
ACTION_AGENT_PROMPT = """You are an advanced implementation and execution agent focused on turning strategic plans into concrete results.
Your core capabilities include:
1. Strategic Implementation Planning
- Break down high-level strategies into specific actions
- Create detailed project roadmaps and timelines
- Identify critical path dependencies
- Establish clear milestones and success metrics
- Design feedback and monitoring mechanisms
2. Resource Optimization
- Assess resource requirements and constraints
- Optimize resource allocation and scheduling
- Identify efficiency opportunities
- Plan for scalability and flexibility
- Manage competing priorities effectively
3. Execution Management
- Develop detailed implementation procedures
- Create clear operational guidelines
- Establish quality control measures
- Design progress tracking systems
- Build in review and adjustment points
4. Risk Management
- Implement specific risk mitigation measures
- Create early warning systems
- Develop contingency procedures
- Establish fallback positions
- Monitor risk indicators
5. Stakeholder Management
- Identify key stakeholders and their needs
- Create communication plans
- Establish feedback mechanisms
- Manage expectations effectively
- Build support and buy-in
6. Continuous Improvement
- Monitor implementation effectiveness
- Gather and analyze performance data
- Identify improvement opportunities
- Implement iterative enhancements
- Document lessons learned
Your output should always include:
- Detailed action plans with specific steps
- Resource requirements and allocation plans
- Timeline with key milestones
- Success metrics and monitoring approach
- Risk mitigation procedures
- Communication and stakeholder management plans
- Quality control measures
- Feedback and adjustment mechanisms
Focus on practical, efficient, and effective implementation while maintaining high quality standards and achieving desired outcomes."""
# Initialize the thinking agent
thinking_agent = Agent(
agent_name="Strategic-Thinker",
agent_description="Deep analysis and strategic planning agent",
system_prompt=THINKING_AGENT_PROMPT,
max_loops=1,
llm=model,
dynamic_temperature_enabled=True,
)
# Initialize the action agent
action_agent = Agent(
agent_name="Action-Executor",
agent_description="Practical implementation and execution agent",
system_prompt=ACTION_AGENT_PROMPT,
max_loops=1,
model_name="gpt-4o",
dynamic_temperature_enabled=True,
)
def run_reasoning_duo(task: str):
# Step 1: Thinking Agent
thinking_result = thinking_agent.run(task)
# Step 2: Action Agent
action_result = action_agent.run(
f"From {thinking_agent.agent_name}: {thinking_result}"
)
return action_result
if __name__ == "__main__":
run_reasoning_duo("What is the best way to invest $1000?")

@ -0,0 +1,170 @@
from loguru import logger
from swarms.structs.swarm_eval import (
SwarmEvaluator,
PRESET_DATASETS,
)
import os
from swarms import Agent
from dotenv import load_dotenv
from swarm_models import OpenAIChat
load_dotenv()
model = OpenAIChat(
model_name="deepseek-ai/DeepSeek-R1-Distill-Llama-70B-free",
openai_api_key=os.getenv("TOGETHER_API_KEY"),
base_url="https://api.together.xyz/v1",
)
# Define system prompts for reasoning agents
THINKING_AGENT_PROMPT = """You are a sophisticated analytical and strategic thinking agent focused on deep problem analysis and solution design.
Your core capabilities include:
1. Comprehensive Problem Analysis
- Break down complex problems into constituent elements
- Map relationships and dependencies between components
- Identify root causes and underlying patterns
- Consider historical context and precedents
2. Multi-Perspective Evaluation
- Examine issues from multiple stakeholder viewpoints
- Consider short-term and long-term implications
- Evaluate social, economic, technical, and ethical dimensions
- Challenge assumptions and identify potential biases
3. Risk Assessment and Mitigation
- Conduct thorough risk analysis across scenarios
- Identify potential failure modes and edge cases
- Develop contingency plans and mitigation strategies
- Assess probability and impact of various outcomes
4. Strategic Solution Development
- Generate multiple solution approaches
- Evaluate trade-offs between different strategies
- Consider resource constraints and limitations
- Design scalable and sustainable solutions
5. Decision Framework Creation
- Establish clear evaluation criteria
- Weight competing priorities appropriately
- Create structured decision matrices
- Document reasoning and key decision factors
6. Systems Thinking
- Map interconnections between system elements
- Identify feedback loops and cascade effects
- Consider emergent properties and behaviors
- Account for dynamic system evolution
Your output should always include:
- Clear articulation of your analytical process
- Key assumptions and their justification
- Potential risks and mitigation strategies
- Multiple solution options with pros/cons
- Specific recommendations with supporting rationale
- Areas of uncertainty requiring further investigation
Focus on developing robust, well-reasoned strategies that account for complexity while remaining practical and actionable."""
ACTION_AGENT_PROMPT = """You are an advanced implementation and execution agent focused on turning strategic plans into concrete results.
Your core capabilities include:
1. Strategic Implementation Planning
- Break down high-level strategies into specific actions
- Create detailed project roadmaps and timelines
- Identify critical path dependencies
- Establish clear milestones and success metrics
- Design feedback and monitoring mechanisms
2. Resource Optimization
- Assess resource requirements and constraints
- Optimize resource allocation and scheduling
- Identify efficiency opportunities
- Plan for scalability and flexibility
- Manage competing priorities effectively
3. Execution Management
- Develop detailed implementation procedures
- Create clear operational guidelines
- Establish quality control measures
- Design progress tracking systems
- Build in review and adjustment points
4. Risk Management
- Implement specific risk mitigation measures
- Create early warning systems
- Develop contingency procedures
- Establish fallback positions
- Monitor risk indicators
5. Stakeholder Management
- Identify key stakeholders and their needs
- Create communication plans
- Establish feedback mechanisms
- Manage expectations effectively
- Build support and buy-in
6. Continuous Improvement
- Monitor implementation effectiveness
- Gather and analyze performance data
- Identify improvement opportunities
- Implement iterative enhancements
- Document lessons learned
Your output should always include:
- Detailed action plans with specific steps
- Resource requirements and allocation plans
- Timeline with key milestones
- Success metrics and monitoring approach
- Risk mitigation procedures
- Communication and stakeholder management plans
- Quality control measures
- Feedback and adjustment mechanisms
Focus on practical, efficient, and effective implementation while maintaining high quality standards and achieving desired outcomes."""
# Initialize the thinking agent
thinking_agent = Agent(
agent_name="Strategic-Thinker",
agent_description="Deep analysis and strategic planning agent",
system_prompt=THINKING_AGENT_PROMPT,
max_loops=1,
llm=model,
dynamic_temperature_enabled=True,
)
class DeepSeekSwarm:
def __init__(self):
self.thinking_agent = thinking_agent
def run(self, task: str):
first_one = self.thinking_agent.run(task)
return self.thinking_agent.run(first_one)
if __name__ == "__main__":
# Initialize the swarm (replace with your actual multi-agent system)
swarm = DeepSeekSwarm()
# Initialize the evaluator with the swarm instance
evaluator = SwarmEvaluator(swarm)
logger.info("Starting evaluation for dataset: gsm8k")
# For demonstration, we use 4 concurrent workers, show progress, and save results.
results = evaluator.evaluate(
"gsm8k",
split="train",
config=PRESET_DATASETS["gsm8k"],
max_workers=os.cpu_count(),
max_retries=3,
show_progress=True,
output_file="gsm8k_results.txt",
)
logger.info(f"Results for gsm8k: {results}")

@ -0,0 +1,52 @@
from swarms import Agent, MajorityVoting
# Initialize multiple agents with different specialties
agents = [
Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor focused on market analysis",
system_prompt="You are a financial advisor specializing in market analysis and investment opportunities.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Risk-Assessment-Agent",
agent_description="Risk analysis and portfolio management expert",
system_prompt="You are a risk assessment expert focused on evaluating investment risks and portfolio diversification.",
max_loops=1,
model_name="gpt-4o"
),
Agent(
agent_name="Tech-Investment-Agent",
agent_description="Technology sector investment specialist",
system_prompt="You are a technology investment specialist focused on AI, emerging tech, and growth opportunities.",
max_loops=1,
model_name="gpt-4o"
)
]
consensus_agent = Agent(
agent_name="Consensus-Agent",
agent_description="Consensus agent focused on analyzing investment advice",
system_prompt="You are a consensus agent focused on analyzing investment advice and providing a final answer.",
max_loops=1,
model_name="gpt-4o"
)
# Create majority voting system
majority_voting = MajorityVoting(
name="Investment-Advisory-System",
description="Multi-agent system for investment advice",
agents=agents,
verbose=True,
consensus_agent=consensus_agent
)
# Run the analysis with majority voting
result = majority_voting.run(
task="Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
correct_answer="" # Optional evaluation metric
)
print(result)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "7.1.2"
version = "7.1.7"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]
@ -73,7 +73,6 @@ docstring_parser = "0.16" # TODO:
tiktoken = "*"
networkx = "*"
aiofiles = "*"
clusterops = "*"
# chromadb = "*"
rich = "*"
numpy = "*"

@ -21,5 +21,4 @@ types-chardet>=5.0.4.6
mypy-protobuf>=3.0.0
pytest>=8.1.1
networkx
aiofiles
clusterops
aiofiles

@ -0,0 +1,18 @@
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="ollama/llama2",
)
agent.run(
"Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
)

@ -75,7 +75,6 @@ from swarms.structs.swarming_architectures import (
staircase_swarm,
star_swarm,
)
from swarms.structs.task import Task
__all__ = [
@ -98,7 +97,6 @@ __all__ = [
"rearrange",
"RoundRobinSwarm",
"SequentialWorkflow",
"Task",
"MixtureOfAgents",
"GraphWorkflow",
"Node",

@ -49,9 +49,6 @@ from swarms.utils.data_to_text import data_to_text
from swarms.utils.file_processing import create_file_in_folder
from swarms.utils.formatter import formatter
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
from swarms.telemetry.main import log_agent_data
from swarms.agents.agent_print import agent_print
from swarms.utils.litellm_tokenizer import count_tokens
@ -760,6 +757,7 @@ class Agent:
is_last: Optional[bool] = False,
print_task: Optional[bool] = False,
generate_speech: Optional[bool] = False,
correct_answer: Optional[str] = None,
*args,
**kwargs,
) -> Any:
@ -858,6 +856,11 @@ class Agent:
# Convert to a str if the response is not a str
response = self.llm_output_parser(response)
# if correct_answer is not None:
# if correct_answer not in response:
# logger.info("Correct answer found in response")
# # break
# Print
if self.streaming_on is True:
# self.stream_response(response)
@ -2465,14 +2468,6 @@ class Agent:
ValueError: If an invalid device is specified.
Exception: If any other error occurs during execution.
"""
device = device or self.device
device_id = device_id or self.device_id
all_cores = all_cores or self.all_cores
all_gpus = all_gpus or self.all_gpus
do_not_use_cluster_ops = (
do_not_use_cluster_ops or self.do_not_use_cluster_ops
)
if scheduled_run_date:
while datetime.now() < scheduled_run_date:
@ -2482,34 +2477,16 @@ class Agent:
try:
# If cluster ops disabled, run directly
if do_not_use_cluster_ops is True:
logger.info("Running without cluster operations")
return self._run(
task=task,
img=img,
*args,
**kwargs,
)
else:
return exec_callable_with_clusterops(
device=device,
device_id=device_id,
all_cores=all_cores,
all_gpus=all_gpus,
func=self._run,
task=task,
img=img,
*args,
**kwargs,
)
return self._run(
task=task,
img=img,
*args,
**kwargs,
)
except ValueError as e:
self._handle_run_error(e)
except Exception as e:
self._handle_run_error(e)
def handle_artifacts(
self, text: str, file_output_path: str, file_extension: str
) -> None:

@ -4,7 +4,6 @@ from typing import Any, Dict, List, Optional
from swarms.utils.formatter import formatter
from swarms.structs.agent import Agent
from swarms.structs.base_structure import BaseStructure
from swarms.structs.task import Task
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger("base-workflow")
@ -43,7 +42,7 @@ class BaseWorkflow(BaseStructure):
def __init__(
self,
agents: List[Agent] = None,
task_pool: List[Task] = None,
task_pool: List[str] = None,
models: List[Any] = None,
*args,
**kwargs,
@ -69,8 +68,8 @@ class BaseWorkflow(BaseStructure):
def add_task(
self,
task: Task = None,
tasks: List[Task] = None,
task: str = None,
tasks: List[str] = None,
*args,
**kwargs,
):
@ -293,12 +292,6 @@ class BaseWorkflow(BaseStructure):
"green",
)
task = Task(
description=task,
agent=kwargs["agent"],
args=list(kwargs["args"]),
kwargs=kwargs["kwargs"],
)
self.tasks.append(task)
except Exception as error:
formatter.print_panel(
@ -306,48 +299,6 @@ class BaseWorkflow(BaseStructure):
)
raise error
def load_workflow_state(
self, filepath: str = None, **kwargs
) -> None:
"""
Loads the workflow state from a json file and restores the workflow state.
Args:
filepath (str): The path to load the workflow state from.
Examples:
>>> from swarm_models import OpenAIChat
>>> from swarms.structs import SequentialWorkflow
>>> llm = OpenAIChat(openai_api_key="")
>>> workflow = SequentialWorkflow(max_loops=1)
>>> workflow.add("What's the weather in miami", llm)
>>> workflow.add("Create a report on these metrics", llm)
>>> workflow.save_workflow_state("sequential_workflow_state.json")
>>> workflow.load_workflow_state("sequential_workflow_state.json")
"""
try:
filepath = filepath or self.restore_state_filepath
with open(filepath) as f:
state = json.load(f)
self.max_loops = state["max_loops"]
self.tasks = []
for task_state in state["tasks"]:
task = Task(
description=task_state["description"],
agent=task_state["agent"],
args=task_state["args"],
kwargs=task_state["kwargs"],
result=task_state["result"],
history=task_state["history"],
)
self.tasks.append(task)
except Exception as error:
formatter.print_panel(
f"Error loading workflow state: {error}",
)
def workflow_dashboard(self, **kwargs) -> None:
"""
Displays a dashboard for the workflow.

@ -8,7 +8,7 @@ from dataclasses import dataclass
import csv
from pathlib import Path
from enum import Enum
from swarms import Agent
from swarms.structs.agent import Agent
class ModelName(str, Enum):

@ -14,15 +14,6 @@ from swarms.utils.auto_download_check_packages import (
auto_check_and_download_package,
)
# Configure logging
logger.add(
"graphswarm.log",
rotation="500 MB",
retention="10 days",
level="INFO",
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
)
class AgentOutput(BaseModel):
"""Structured output from an agent."""

@ -1,9 +1,12 @@
import concurrent.futures
from datetime import datetime
import os
from typing import Callable, List
from dotenv import load_dotenv
from loguru import logger
from pydantic import BaseModel, Field
from swarm_models import OpenAIChat
from swarms.structs.agent import Agent
@ -123,7 +126,7 @@ class GroupChat:
description: str = "A group chat for multiple agents",
agents: List[Agent] = [],
speaker_fn: SpeakerFunction = round_robin,
max_loops: int = 10,
max_loops: int = 1,
):
"""
Initialize the GroupChat.
@ -171,7 +174,9 @@ class GroupChat:
Previous messages: {self.get_full_chat_history()}
""" # Updated line
message = agent.run(context + prompt)
message = agent.run(
task=f"From {agent.agent_name}: {context} \n {prompt}"
)
return AgentResponse(
agent_name=agent.name,
role=agent.system_prompt,
@ -224,7 +229,7 @@ class GroupChat:
def run(self, task: str) -> ChatHistory:
"""
Run the group chat.
Run the group chat, feeding the context of previous turns into each new turn.
Args:
task (str): The initial message to start the chat.
@ -242,12 +247,22 @@ class GroupChat:
turn_number=turn, responses=[], task=task
)
# Get context from previous turns
context = self.get_full_chat_history()
# Combine task with context for agents
contextualized_task = (
f"{task}\n\nPrevious conversation:\n{context}"
if context
else task
)
for agent in self.agents:
if self.speaker_fn(
self.get_recent_messages(), agent
):
response = self._get_response_sync(
agent, task, turn
agent, contextualized_task, turn
)
current_turn.responses.append(response)
self.chat_history.total_messages += 1
@ -293,63 +308,63 @@ class GroupChat:
)
# if __name__ == "__main__":
# load_dotenv()
# # Get the OpenAI API key from the environment variable
# api_key = os.getenv("OPENAI_API_KEY")
# # Create an instance of the OpenAIChat class
# model = OpenAIChat(
# openai_api_key=api_key,
# model_name="gpt-4o-mini",
# temperature=0.1,
# )
# # Example agents
# agent1 = Agent(
# agent_name="Financial-Analysis-Agent",
# system_prompt="You are a financial analyst specializing in investment strategies.",
# llm=model,
# max_loops=1,
# autosave=False,
# dashboard=False,
# verbose=True,
# dynamic_temperature_enabled=True,
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# streaming_on=False,
# )
# agent2 = Agent(
# agent_name="Tax-Adviser-Agent",
# system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.",
# llm=model,
# max_loops=1,
# autosave=False,
# dashboard=False,
# verbose=True,
# dynamic_temperature_enabled=True,
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# output_type="string",
# streaming_on=False,
# )
# agents = [agent1, agent2]
# chat = GroupChat(
# name="Investment Advisory",
# description="Financial and tax analysis group",
# agents=agents,
# speaker_fn=expertise_based,
# )
# history = chat.run(
# "How to optimize tax strategy for investments?"
# )
# print(history.model_dump_json(indent=2))
if __name__ == "__main__":
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4o-mini",
temperature=0.1,
)
# Example agents
agent1 = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt="You are a financial analyst specializing in investment strategies.",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
output_type="string",
streaming_on=False,
)
agent2 = Agent(
agent_name="Tax-Adviser-Agent",
system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
output_type="string",
streaming_on=False,
)
agents = [agent1, agent2]
chat = GroupChat(
name="Investment Advisory",
description="Financial and tax analysis group",
agents=agents,
speaker_fn=expertise_based,
)
history = chat.run(
"How to optimize tax strategy for investments?"
)
print(history.model_dump_json(indent=2))

@ -1,6 +1,5 @@
import asyncio
import concurrent.futures
import multiprocessing
import os
import re
from collections import Counter
@ -147,6 +146,7 @@ class MajorityVoting:
consensus_agent: Optional[Agent] = None,
autosave: bool = False,
verbose: bool = False,
max_loops: int = 1,
*args,
**kwargs,
):
@ -157,6 +157,7 @@ class MajorityVoting:
self.consensus_agent = consensus_agent
self.autosave = autosave
self.verbose = verbose
self.max_loops = max_loops
self.conversation = Conversation(
time_enabled=True, *args, **kwargs
@ -175,7 +176,9 @@ class MajorityVoting:
title="Majority Voting",
)
def run(self, task: str, *args, **kwargs) -> List[Any]:
def run(
self, task: str, correct_answer: str, *args, **kwargs
) -> List[Any]:
"""
Runs the majority voting system and returns the majority vote.
@ -200,25 +203,44 @@ class MajorityVoting:
)
self.conversation.add(agent.agent_name, response)
# Perform majority voting on the conversation
responses = [
message["content"]
for message in self.conversation.conversation_history
if message["role"] == "agent"
]
responses = self.conversation.return_history_as_string()
print(responses)
prompt = f"""Conduct a detailed majority voting analysis on the following conversation:
{responses}
Between the following agents: {[agent.agent_name for agent in self.agents]}
Please:
1. Identify the most common answer/recommendation across all agents
2. Analyze any major disparities or contrasting viewpoints between agents
3. Highlight key areas of consensus and disagreement
4. Evaluate the strength of the majority opinion
5. Note any unique insights from minority viewpoints
6. Provide a final synthesized recommendation based on the majority consensus
Focus on finding clear patterns while being mindful of important nuances in the responses.
"""
# If an output parser is provided, parse the responses
if self.output_parser is not None:
majority_vote = self.output_parser(
responses, *args, **kwargs
if self.consensus_agent is not None:
majority_vote = self.consensus_agent.run(
prompt
)
self.conversation.add(
self.consensus_agent.agent_name, majority_vote
)
elif self.consensus_agent is not None:
majority_vote = self.consensus_agent.run(responses)
else:
majority_vote = majority_voting(responses)
# fetch the last agent
majority_vote = self.agents[-1].run(prompt)
self.conversation.add(
self.agents[-1].agent_name, majority_vote
)
# Return the majority vote
return majority_vote
return self.conversation.return_history_as_string()
def batch_run(
self, tasks: List[str], *args, **kwargs
@ -262,23 +284,6 @@ class MajorityVoting:
for future in concurrent.futures.as_completed(futures)
]
def run_concurrently_multiprocess(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:
"""
Runs the majority voting system concurrently using multiprocessing.
Args:
tasks (List[str]): List of tasks to be performed by the agents.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
List[Any]: List of majority votes for each task.
"""
with multiprocessing.Pool(processes=os.cpu_count()) as pool:
return pool.map(self.run, tasks)
async def run_async(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:

@ -1,4 +1,5 @@
import asyncio
import os
import time
from typing import Any, Dict, List, Optional
@ -9,6 +10,8 @@ from swarms.telemetry.main import log_agent_data
from swarms.schemas.agent_step_schemas import ManySteps
from swarms.prompts.ag_prompt import aggregator_system_prompt
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.any_to_str import any_to_str
import concurrent.futures
logger = initialize_logger(log_folder="mixture_of_agents")
@ -64,6 +67,8 @@ class MixtureOfAgents:
aggregator_agent: Agent = None,
aggregator_system_prompt: str = "",
layers: int = 3,
max_loops: int = 1,
return_str_on: bool = False,
) -> None:
"""
Initialize the Mixture of Agents class with agents and configuration.
@ -82,6 +87,8 @@ class MixtureOfAgents:
self.aggregator_agent: Agent = aggregator_agent
self.aggregator_system_prompt: str = aggregator_system_prompt
self.layers: int = layers
self.max_loops: int = max_loops
self.return_str_on: bool = return_str_on
self.input_schema = MixtureOfAgentsInput(
name=name,
@ -233,10 +240,61 @@ class MixtureOfAgents:
Args:
task (str): The task for the mixture of agents.
"""
asyncio.run(self._run_async(task))
try:
prev_context = None
for _ in range(self.max_loops):
# Add previous context to task if available
current_task = (
f"{task}\n\nPrevious context:\n{prev_context}"
if prev_context
else task
)
self.output_schema.task = task
# Run async process
asyncio.run(self._run_async(current_task))
# Store current results as context for next loop
prev_context = (
self.output_schema.aggregator_agent_summary
)
self.output_schema.task = task
log_agent_data(self.output_schema.model_dump())
if self.return_str_on:
return any_to_str(self.output_schema.model_dump())
else:
return self.output_schema.model_dump_json(indent=4)
except Exception as e:
logger.error(f"Error running mixture of agents: {str(e)}")
raise e
def run_batched(self, tasks: List[str]) -> List[str]:
"""
Run the mixture of agents for a batch of tasks.
Args:
tasks (List[str]): A list of tasks for the mixture of agents.
log_agent_data(self.output_schema.model_dump())
Returns:
List[str]: A list of responses from the mixture of agents.
"""
return [self.run(task) for task in tasks]
return self.output_schema.model_dump_json(indent=4)
def run_concurrently(self, tasks: List[str]) -> List[str]:
"""
Run the mixture of agents for a batch of tasks concurrently.
"""
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
futures = [
executor.submit(self.run, task) for task in tasks
]
return [
future.result()
for future in concurrent.futures.as_completed(futures)
]

@ -3,16 +3,12 @@ import os
import threading
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from multiprocessing import cpu_count
from typing import Any, List
import psutil
from swarms.structs.agent import Agent
from swarms.structs.omni_agent_types import AgentType
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
@dataclass
@ -113,7 +109,7 @@ def run_agents_concurrently(
def run_agents_concurrently_multiprocess(
agents: List[Agent], task: str, batch_size: int = cpu_count()
agents: List[Agent], task: str, batch_size: int = os.cpu_count()
) -> List[Any]:
"""
Manage and run multiple agents concurrently in batches, with optimized performance.
@ -180,7 +176,7 @@ def run_agents_with_different_tasks(
agent, task = pair
return await run_agent_async(agent, task, executor)
cpu_cores = cpu_count()
cpu_cores = os.cpu_count()
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
results = []
@ -253,7 +249,7 @@ def run_agents_with_timeout(
Returns:
List of outputs (None for timed out agents)
"""
cpu_cores = cpu_count()
cpu_cores = os.cpu_count()
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
results = []
@ -412,22 +408,9 @@ def run_agents_with_tasks_concurrently(
List[Any]: A list of outputs from each agent execution.
"""
# Make the first agent not use the ifrs
if no_clusterops:
return _run_agents_with_tasks_concurrently(
agents, tasks, batch_size, max_workers
)
else:
return exec_callable_with_clusterops(
device,
device_id,
all_cores,
_run_agents_with_tasks_concurrently,
agents,
tasks,
batch_size,
max_workers,
)
return _run_agents_with_tasks_concurrently(
agents, tasks, batch_size, max_workers
)
# # Example usage:

@ -1,244 +0,0 @@
from multiprocessing import Manager, Pool, cpu_count
from typing import Sequence, Union, Callable, List
from concurrent.futures import ThreadPoolExecutor, as_completed
from swarms.structs.agent import Agent
from swarms.structs.base_workflow import BaseWorkflow
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="multi_process_workflow")
class MultiProcessWorkflow(BaseWorkflow):
"""
Initialize a MultiProcessWorkflow object.
Args:
max_workers (int): The maximum number of workers to use for parallel processing.
autosave (bool): Flag indicating whether to automatically save the workflow.
agents (List[Union[Agent, Callable]]): A list of Agent objects or callable functions representing the workflow tasks.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Example:
>>> from swarms.structs.multi_process_workflow import MultiProcessingWorkflow
>>> from swarms.structs.task import Task
>>> from datetime import datetime
>>> from time import sleep
>>>
>>> # Define a simple task
>>> def simple_task():
>>> sleep(1)
>>> return datetime.now()
>>>
>>> # Create a task object
>>> task = Task(
>>> name="Simple Task",
>>> execute=simple_task,
>>> priority=1,
>>> )
>>>
>>> # Create a workflow with the task
>>> workflow = MultiProcessingWorkflow(tasks=[task])
>>>
>>> # Run the workflow
>>> results = workflow.run(task)
>>>
>>> # Print the results
>>> print(results)
"""
def __init__(
self,
max_workers: int = 5,
autosave: bool = True,
agents: Sequence[Union[Agent, Callable]] = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.max_workers = max_workers
self.autosave = autosave
self.agents = agents
self.max_workers or cpu_count()
# Log
logger.info(
(
"Initialized MultiProcessWorkflow with"
f" {self.max_workers} max workers and autosave set to"
f" {self.autosave}"
),
)
# Log the agents
if self.agents is not None:
for agent in self.agents:
logger.info(f"Agent: {agent.agent_name}")
def execute_task(self, task: str, *args, **kwargs):
"""Execute a task and handle exceptions.
Args:
task (Task): The task to execute.
*args: Additional positional arguments for the task execution.
**kwargs: Additional keyword arguments for the task execution.
Returns:
Any: The result of the task execution.
"""
try:
if self.agents is not None:
# Execute the task
for agent in self.agents:
result = agent.run(task, *args, **kwargs)
return result
except Exception as e:
logger.error(
(
"An error occurred during execution of task"
f" {task}: {str(e)}"
),
)
return None
def run(self, task: str, *args, **kwargs):
"""Run the workflow.
Args:
task (Task): The task to run.
*args: Additional positional arguments for the task execution.
**kwargs: Additional keyword arguments for the task execution.
Returns:
List[Any]: The results of all executed tasks.
"""
try:
results = []
with Manager() as manager:
with Pool(
processes=self.max_workers, *args, **kwargs
) as pool:
# Using manager.list() to collect results in a process safe way
results_list = manager.list()
jobs = [
pool.apply_async(
self.execute_task, # Pass the function, not the function call
args=(task,)
+ args, # Pass the arguments as a tuple
kwds=kwargs, # Pass the keyword arguments as a dictionary
callback=results_list.append,
timeout=task.timeout,
)
for agent in self.agents
]
# Wait for all jobs to complete
for job in jobs:
job.get()
results = list(results_list)
return results
except Exception as error:
logger.error(f"Error in run: {error}")
return None
async def async_run(self, task: str, *args, **kwargs):
"""Asynchronously run the workflow.
Args:
task (Task): The task to run.
*args: Additional positional arguments for the task execution.
**kwargs: Additional keyword arguments for the task execution.
Returns:
List[Any]: The results of all executed tasks.
"""
try:
results = []
with ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
futures = [
executor.submit(
self.execute_task, task, *args, **kwargs
)
for _ in range(len(self.agents))
]
for future in as_completed(futures):
result = future.result()
results.append(result)
return results
except Exception as error:
logger.error(f"Error in async_run: {error}")
return None
def batched_run(
self, tasks: List[str], batch_size: int = 5, *args, **kwargs
):
"""Run tasks in batches.
Args:
tasks (List[str]): A list of tasks to run.
batch_size (int): The size of each batch.
*args: Additional positional arguments for the task execution.
**kwargs: Additional keyword arguments for the task execution.
Returns:
List[Any]: The results of all executed tasks.
"""
try:
results = []
for i in range(0, len(tasks), batch_size):
batch = tasks[i : i + batch_size]
with Pool(processes=self.max_workers) as pool:
results_list = pool.map(
self.execute_task, batch, *args, **kwargs
)
results.extend(results_list)
return results
except Exception as error:
logger.error(f"Error in batched_run: {error}")
return None
def concurrent_run(self, tasks: List[str], *args, **kwargs):
"""Run tasks concurrently.
Args:
tasks (List[str]): A list of tasks to run.
*args: Additional positional arguments for the task execution.
**kwargs: Additional keyword arguments for the task execution.
Returns:
List[Any]: The results of all executed tasks.
"""
try:
results = []
with ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
futures = [
executor.submit(
self.execute_task, task, *args, **kwargs
)
for task in tasks
]
for future in as_completed(futures):
result = future.result()
results.append(result)
return results
except Exception as error:
logger.error(f"Error in concurrent_run: {error}")
return None

@ -13,9 +13,6 @@ from swarms.structs.agents_available import showcase_available_agents
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.output_types import OutputType
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
from swarms.telemetry.main import log_agent_data
logger = initialize_logger(log_folder="rearrange")
@ -523,29 +520,13 @@ class AgentRearrange(BaseSwarm):
The result from executing the task through the cluster operations wrapper.
"""
try:
no_use_clusterops = (
no_use_clusterops or self.no_use_clusterops
return self._run(
task=task,
img=img,
*args,
**kwargs,
)
if no_use_clusterops is True:
return self._run(
task=task,
img=img,
*args,
**kwargs,
)
else:
return exec_callable_with_clusterops(
device=device,
device_id=device_id,
all_cores=all_cores,
all_gpus=all_gpus,
func=self._run,
task=task,
img=img,
*args,
**kwargs,
)
except Exception as e:
self._catch_error(e)

@ -104,10 +104,6 @@ class SwarmRearrange:
# Run the reliability checks
self.reliability_checks()
# Logging configuration
if self.verbose:
logger.add("swarm_rearrange.log", rotation="10 MB")
def reliability_checks(self):
logger.info("Running reliability checks.")
if not self.swarms:

@ -14,8 +14,6 @@ from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter, SwarmType
from swarms.utils.function_caller_model import OpenAIFunctionCaller
logger.add("swarm_builder.log", rotation="10 MB", backtrace=True)
BOSS_SYSTEM_PROMPT = """
Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective.

@ -0,0 +1,326 @@
import math
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Callable, Dict, Optional, Tuple
from datasets import Dataset, load_dataset
from loguru import logger
from tqdm import tqdm
# -----------------------------------------------------------------------------
# Logging configuration: log to console and file (rotating by size)
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Swarm interface example
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Benchmark configuration
# -----------------------------------------------------------------------------
class BenchmarkConfig:
"""
Configuration for a benchmark dataset.
Attributes:
input_column (str): The column containing the task prompt.
answer_column (str): The column containing the expected answer.
answer_extractor (Optional[Callable[[Any], str]]): Function to extract
a string answer from the dataset's raw answer format.
answer_matcher (Optional[Callable[[str, str], bool]]): Function to compare
the expected answer and the swarm output. If None, a simple substring
containment is used.
"""
def __init__(
self,
input_column: str,
answer_column: str,
answer_extractor: Optional[Callable[[Any], str]] = None,
answer_matcher: Optional[Callable[[str, str], bool]] = None,
):
self.input_column = input_column
self.answer_column = answer_column
self.answer_extractor = answer_extractor
self.answer_matcher = answer_matcher
# -----------------------------------------------------------------------------
# Preset dataset configurations for popular benchmarks
# -----------------------------------------------------------------------------
PRESET_DATASETS: Dict[str, BenchmarkConfig] = {
"gsm8k": BenchmarkConfig(
input_column="question",
answer_column="answer",
),
"squad": BenchmarkConfig(
input_column="question",
answer_column="answers",
answer_extractor=lambda ans: (
ans["text"][0]
if isinstance(ans, dict)
and "text" in ans
and isinstance(ans["text"], list)
and ans["text"]
else str(ans)
),
),
"winogrande": BenchmarkConfig(
input_column="sentence",
answer_column="answer",
),
"commonsense_qa": BenchmarkConfig(
input_column="question",
answer_column="answerKey",
),
# Add additional presets here.
}
# -----------------------------------------------------------------------------
# SwarmEvaluator with extended features
# -----------------------------------------------------------------------------
class SwarmEvaluator:
"""
Evaluator that uses a swarm of agents to process benchmark datasets
from Hugging Face, with concurrency, retries, progress display, performance timing,
and customizable answer matching.
Example:
swarm = Swarm()
evaluator = SwarmEvaluator(swarm)
results = evaluator.evaluate("gsm8k", split="test", max_workers=4)
print(results)
"""
def __init__(self, swarm: callable) -> None:
"""
Initialize the evaluator with a given swarm.
Args:
swarm (Swarm): A swarm instance with a callable run(task: str) method.
"""
self.swarm = swarm
def evaluate(
self,
dataset_name: str,
split: str = "test",
config: Optional[BenchmarkConfig] = None,
max_workers: int = 1,
max_retries: int = 3,
show_progress: bool = True,
output_file: Optional[str] = None,
) -> Dict[str, Any]:
"""
Evaluate the specified benchmark dataset using the swarm.
Args:
dataset_name (str): The dataset name (from Hugging Face).
split (str): The dataset split (e.g., "test", "validation").
config (Optional[BenchmarkConfig]): Benchmark configuration. If None,
a preset config is used.
max_workers (int): Number of concurrent workers.
max_retries (int): Number of retries for swarm tasks on failure.
show_progress (bool): If True, display a progress bar.
output_file (Optional[str]): Path to a file to write the results.
Returns:
Dict[str, Any]: Evaluation metrics including total examples, correct answers,
accuracy, and total evaluation time.
"""
if config is None:
config = PRESET_DATASETS.get(dataset_name)
if config is None:
raise ValueError(
f"No preset config for dataset '{dataset_name}'. Provide a BenchmarkConfig."
)
logger.info(
f"Loading dataset '{dataset_name}' (split: {split})..."
)
dataset: Dataset = load_dataset(dataset_name, split=split)
total_examples = len(dataset)
logger.info(f"Total examples to evaluate: {total_examples}")
start_time = time.time()
correct = 0
# Function to process a single example.
def _process_example(
example: Dict[str, Any], idx: int
) -> Tuple[bool, float]:
task_start = time.time()
task_text = example.get(config.input_column)
expected_answer = example.get(config.answer_column)
if task_text is None or expected_answer is None:
logger.warning(
f"Example {idx}: Missing '{config.input_column}' or '{config.answer_column}', skipping."
)
return (False, 0.0)
# Use answer_extractor if provided.
if config.answer_extractor:
try:
expected_answer = config.answer_extractor(
expected_answer
)
except Exception as e:
logger.error(
f"Example {idx}: Error extracting answer: {e}"
)
return (False, 0.0)
logger.debug(f"Example {idx} - Task: {task_text}")
logger.debug(
f"Example {idx} - Expected Answer: {expected_answer}"
)
try:
swarm_output = self._run_with_retry(
task_text, max_retries
)
except Exception as e:
logger.error(
f"Example {idx}: Failed after retries. Error: {e}"
)
return (False, time.time() - task_start)
logger.debug(
f"Example {idx} - Swarm Output: {swarm_output}"
)
# Use custom matcher if provided; otherwise, default matching.
if config.answer_matcher:
is_correct = config.answer_matcher(
expected_answer, swarm_output
)
else:
is_correct = self._default_matcher(
expected_answer, swarm_output
)
task_time = time.time() - task_start
logger.info(
f"Example {idx}: {'Correct' if is_correct else 'Incorrect'} in {task_time:.2f}s"
)
return (is_correct, task_time)
# Use ThreadPoolExecutor for concurrency.
futures = []
total_time = 0.0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Optionally wrap the dataset with tqdm for a progress bar.
examples_iter = enumerate(dataset, start=1)
if show_progress:
examples_iter = tqdm(
list(examples_iter),
total=total_examples,
desc="Evaluating",
)
for idx, example in examples_iter:
futures.append(
executor.submit(_process_example, example, idx)
)
for future in as_completed(futures):
try:
is_correct, elapsed = future.result()
total_time += elapsed
if is_correct:
correct += 1
except Exception as e:
logger.error(f"Error processing an example: {e}")
overall_time = time.time() - start_time
accuracy = (
correct / total_examples if total_examples > 0 else 0.0
)
logger.info(
f"Evaluation complete. Total examples: {total_examples}, Correct: {correct}, "
f"Accuracy: {accuracy:.2%}, Overall Time: {overall_time:.2f}s, "
f"Average per-example time: {total_time/total_examples if total_examples else 0:.2f}s"
)
results = {
"total": total_examples,
"correct": correct,
"accuracy": accuracy,
"overall_time": overall_time,
"average_example_time": (
total_time / total_examples
if total_examples
else math.nan
),
}
# Optionally save results to a file.
if output_file:
try:
with open(output_file, "w") as f:
for key, value in results.items():
f.write(f"{key}: {value}\n")
logger.info(f"Results saved to {output_file}")
except Exception as e:
logger.error(
f"Error saving results to {output_file}: {e}"
)
return results
def _run_with_retry(self, task: str, max_retries: int) -> str:
"""
Runs the swarm task with a retry mechanism.
Args:
task (str): The task string.
max_retries (int): Maximum number of retries.
Returns:
str: Swarm output.
Raises:
Exception: If all retries fail.
"""
attempt = 0
while attempt <= max_retries:
try:
start = time.time()
result = self.swarm.run(task)
elapsed = time.time() - start
logger.debug(
f"Task succeeded in {elapsed:.2f}s on attempt {attempt + 1}"
)
return result
except Exception as e:
logger.warning(
f"Task failed on attempt {attempt + 1}: {e}"
)
attempt += 1
time.sleep(0.5 * attempt) # Exponential backoff
raise Exception("Max retries exceeded for task.")
@staticmethod
def _default_matcher(expected: str, output: str) -> bool:
"""
Default answer matching using a normalized substring check.
Args:
expected (str): The expected answer.
output (str): The swarm output.
Returns:
bool: True if expected is found in output; otherwise, False.
"""
expected_norm = " ".join(expected.strip().split())
output_norm = " ".join(output.strip().split())
return expected_norm in output_norm
# -----------------------------------------------------------------------------
# Example usage
# -----------------------------------------------------------------------------

@ -42,8 +42,6 @@ class SwarmMatcher:
Args:
config (SwarmMatcherConfig): The configuration for the SwarmMatcher.
"""
logger.add("swarm_matcher_debug.log", level="DEBUG")
logger.debug("Initializing SwarmMatcher")
try:
import torch

@ -1,3 +1,4 @@
import os
import uuid
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Union
@ -8,16 +9,14 @@ from tenacity import retry, stop_after_attempt, wait_fixed
from swarms.prompts.ag_prompt import aggregator_system_prompt
from swarms.structs.agent import Agent
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.csv_to_agent import AgentLoader
from swarms.structs.groupchat import GroupChat
from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.structs.multi_agent_orchestrator import MultiAgentRouter
from swarms.structs.rearrange import AgentRearrange
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
from swarms.structs.groupchat import GroupChat
from swarms.structs.multi_agent_orchestrator import MultiAgentRouter
from swarms.structs.swarm_matcher import swarm_matcher
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="swarm_router")
@ -142,6 +141,8 @@ class SwarmRouter:
output_type: str = "string", # Md, PDF, Txt, csv
no_cluster_ops: bool = False,
speaker_fn: callable = None,
load_agents_from_csv: bool = False,
csv_file_path: str = None,
*args,
**kwargs,
):
@ -161,6 +162,13 @@ class SwarmRouter:
self.no_cluster_ops = no_cluster_ops
self.speaker_fn = speaker_fn
self.logs = []
self.load_agents_from_csv = load_agents_from_csv
self.csv_file_path = csv_file_path
if self.load_agents_from_csv:
self.agents = AgentLoader(
csv_path=self.csv_file_path
).load_agents()
self.reliability_check()
@ -180,10 +188,6 @@ class SwarmRouter:
if self.rules is not None:
self.handle_rules()
def deactivate_clusterops(self):
for agent in self.agents:
agent.do_not_use_cluster_ops = True
def activate_shared_memory(self):
logger.info("Activating shared memory with all agents ")
@ -273,9 +277,6 @@ class SwarmRouter:
self._create_swarm(self.swarm_type)
if self.no_cluster_ops:
self.deactivate_clusterops()
elif self.swarm_type == "AgentRearrange":
return AgentRearrange(
name=self.name,
@ -428,10 +429,6 @@ class SwarmRouter:
self,
task: str,
img: str = None,
device: str = "cpu",
all_cores: bool = True,
all_gpus: bool = False,
no_clusterops: bool = True,
*args,
**kwargs,
) -> Any:
@ -453,18 +450,7 @@ class SwarmRouter:
Exception: If an error occurs during task execution.
"""
try:
if no_clusterops:
return self._run(task=task, img=img, *args, **kwargs)
else:
return exec_callable_with_clusterops(
func=self._run,
device=device,
all_cores=all_cores,
all_gpus=all_gpus,
task=task,
*args,
**kwargs,
)
return self._run(task=task, img=img, *args, **kwargs)
except Exception as e:
logger.error(f"Error executing task on swarm: {str(e)}")
raise
@ -515,41 +501,6 @@ class SwarmRouter:
raise
return results
def threaded_run(self, task: str, *args, **kwargs) -> Any:
"""
Execute a task on the selected or matched swarm type using threading.
Args:
task (str): The task to be executed by the swarm.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
Any: The result of the swarm's execution.
Raises:
Exception: If an error occurs during task execution.
"""
from threading import Thread
def run_in_thread():
try:
result = self.run(task, *args, **kwargs)
return result
except Exception as e:
self._log(
"error",
f"Error occurred while running task in thread on {self.swarm_type} swarm: {str(e)}",
task=task,
metadata={"error": str(e)},
)
raise
thread = Thread(target=run_in_thread)
thread.start()
thread.join()
return thread.result
def async_run(self, task: str, *args, **kwargs) -> Any:
"""
Execute a task on the selected or matched swarm type asynchronously.
@ -610,7 +561,9 @@ class SwarmRouter:
"""
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
with ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
future = executor.submit(self.run, task, *args, **kwargs)
result = future.result()
return result

@ -299,6 +299,7 @@ def log_agent_data(data_dict: dict) -> dict | None:
requests.exceptions.RequestException,
requests.exceptions.JSONDecodeError,
):
pass # Fail silently without any action
return None # Return None if anything goes wrong
return None # Return None if anything goes wrong
# print(log_agent_data(get_user_device_data()))

@ -1,10 +1,10 @@
import os
import uuid
import sys
from loguru import logger
def initialize_logger(log_folder: str = "logs"):
AGENT_WORKSPACE = "agent_workspace"
# Check if WORKSPACE_DIR is set, if not, set it to AGENT_WORKSPACE
@ -24,14 +24,27 @@ def initialize_logger(log_folder: str = "logs"):
log_folder_path, f"{log_folder}_{uuid_for_log}.log"
)
# Remove default handler and add custom handlers
logger.remove()
# Add console handler with colors
logger.add(
sys.stdout,
colorize=True,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO",
)
# Add file handler
logger.add(
log_file_path,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="INFO",
colorize=True,
backtrace=True,
diagnose=True,
enqueue=True,
retention="10 days",
# compression="zip",
)
return logger

@ -113,3 +113,15 @@ def exec_callable_with_clusterops(
if enable_logging:
logger.error(f"An error occurred during execution: {e}")
raise
# def test_clusterops(x):
# return x + 1
# example = exec_callable_with_clusterops(
# device="cpu",
# all_cores=True,
# func = test_clusterops,
# )
# print(example)

@ -81,12 +81,6 @@ class SwarmsIssueReporter:
# Initialize logging
log_path = os.path.join(os.getcwd(), "logs", log_file)
os.makedirs(os.path.dirname(log_path), exist_ok=True)
logger.add(
log_path,
rotation="1 day",
retention="1 month",
compression="zip",
)
# Issue tracking
self.issues_created = []

@ -9,15 +9,6 @@ from swarm_models import OpenAIChat
GITHUB_REPO = "kyegomez/swarms" # Swarms GitHub repository
GITHUB_API_URL = f"https://api.github.com/repos/{GITHUB_REPO}/commits"
# Initialize Loguru
logger.add(
"commit_summary.log",
rotation="1 MB",
level="INFO",
backtrace=True,
diagnose=True,
)
# Step 1: Fetch the latest commits from GitHub
def fetch_latest_commits(

@ -1,177 +0,0 @@
import asyncio
import time
from swarms.structs.agent import Agent
from swarms.structs.multi_process_workflow import MultiProcessWorkflow
def create_test_agent(name: str) -> Agent:
"""Create a test agent that simply returns its input with a timestamp"""
return Agent(
agent_name=name,
system_prompt=f"Test prompt for {name}",
model_name="gpt-4o-mini",
max_loops=1,
)
def test_initialization():
"""Test basic workflow initialization"""
print("\n=== Testing Workflow Initialization ===")
try:
agents = [create_test_agent(f"agent{i}") for i in range(3)]
workflow = MultiProcessWorkflow(max_workers=2, agents=agents)
print("✓ Created workflow with configuration:")
print(f" - Max workers: {workflow.max_workers}")
print(f" - Number of agents: {len(workflow.agents)}")
print(f" - Autosave: {workflow.autosave}")
print("✓ Initialization test passed")
except Exception as e:
print(f"✗ Initialization test failed: {str(e)}")
raise
def test_execute_task():
"""Test execution of a single task"""
print("\n=== Testing Task Execution ===")
try:
agents = [create_test_agent("test_agent")]
workflow = MultiProcessWorkflow(agents=agents)
test_task = "Return this message with timestamp"
result = workflow.execute_task(test_task)
print("✓ Task executed successfully")
print(f" - Input task: {test_task}")
print(f" - Result: {result}")
print("✓ Task execution test passed")
except Exception as e:
print(f"✗ Task execution test failed: {str(e)}")
raise
def test_parallel_run():
"""Test parallel execution of tasks"""
print("\n=== Testing Parallel Run ===")
try:
agents = [create_test_agent(f"agent{i}") for i in range(3)]
workflow = MultiProcessWorkflow(max_workers=2, agents=agents)
test_task = "Process this in parallel"
results = workflow.run(test_task)
print("✓ Parallel execution completed")
# print(f" - Number of results: {len(results)}")
print(f" - Results: {results}")
print("✓ Parallel run test passed")
except Exception as e:
print(f"✗ Parallel run test failed: {str(e)}")
raise
async def test_async_run():
"""Test asynchronous execution of tasks"""
print("\n=== Testing Async Run ===")
try:
agents = [create_test_agent(f"agent{i}") for i in range(3)]
workflow = MultiProcessWorkflow(max_workers=2, agents=agents)
test_task = "Process this asynchronously"
results = await workflow.async_run(test_task)
print("✓ Async execution completed")
print(f" - Number of results: {len(results)}")
print(f" - Results: {results}")
print("✓ Async run test passed")
except Exception as e:
print(f"✗ Async run test failed: {str(e)}")
raise
def test_batched_run():
"""Test batch execution of tasks"""
print("\n=== Testing Batched Run ===")
try:
agents = [create_test_agent(f"agent{i}") for i in range(2)]
workflow = MultiProcessWorkflow(max_workers=2, agents=agents)
tasks = [f"Batch task {i}" for i in range(5)]
results = workflow.batched_run(tasks, batch_size=2)
print("✓ Batch execution completed")
print(f" - Number of tasks: {len(tasks)}")
print(" - Batch size: 2")
print(f" - Results: {results}")
print("✓ Batched run test passed")
except Exception as e:
print(f"✗ Batched run test failed: {str(e)}")
raise
def test_concurrent_run():
"""Test concurrent execution of tasks"""
print("\n=== Testing Concurrent Run ===")
try:
agents = [create_test_agent(f"agent{i}") for i in range(2)]
workflow = MultiProcessWorkflow(max_workers=2, agents=agents)
tasks = [f"Concurrent task {i}" for i in range(4)]
results = workflow.concurrent_run(tasks)
print("✓ Concurrent execution completed")
print(f" - Number of tasks: {len(tasks)}")
print(f" - Results: {results}")
print("✓ Concurrent run test passed")
except Exception as e:
print(f"✗ Concurrent run test failed: {str(e)}")
raise
def test_error_handling():
"""Test error handling in workflow"""
print("\n=== Testing Error Handling ===")
try:
# Create workflow with no agents to trigger error
workflow = MultiProcessWorkflow(max_workers=2, agents=None)
result = workflow.execute_task(
"This should handle the error gracefully"
)
print("✓ Error handled gracefully")
print(f" - Result when no agents: {result}")
print("✓ Error handling test passed")
except Exception as e:
print(f"✗ Error handling test failed: {str(e)}")
raise
async def run_all_tests():
"""Run all tests"""
print("\n=== Starting MultiProcessWorkflow Test Suite ===")
start_time = time.time()
try:
# Run synchronous tests
test_initialization()
test_execute_task()
test_parallel_run()
test_batched_run()
test_concurrent_run()
test_error_handling()
# Run async test
await test_async_run()
end_time = time.time()
duration = round(end_time - start_time, 2)
print("\n=== Test Suite Completed Successfully ===")
print(f"Time taken: {duration} seconds")
except Exception as e:
print("\n=== Test Suite Failed ===")
print(f"Error: {str(e)}")
raise
if __name__ == "__main__":
asyncio.run(run_all_tests())
Loading…
Cancel
Save