From 928e1ecf4575cf11a03344b93cc741a688184d42 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Wed, 12 Feb 2025 15:31:02 -0800 Subject: [PATCH] [7.1.7] [cleanup] [no clusterops] [majorityvoting fix] --- .../models/models_available_overview.md | 2 - docs/swarms/structs/majorityvoting.md | 314 ++++----- docs/swarms/structs/moa.md | 220 ++++++- docs/swarms/structs/multi_process_workflow.md | 124 ---- .../structs/multi_processing_workflow.md | 204 ------ example.py | 1 - examples/chart_swarm.py | 388 +++++++++++ examples/morgtate_swarm.py | 603 ++++++++++++++++++ examples/reasoning_duo.py | 156 +++++ examples/swarm_eval_deepseek.py | 170 +++++ majority_voting_example.py | 52 ++ pyproject.toml | 3 +- requirements.txt | 3 +- simple_example_ollama.py | 18 + swarms/structs/__init__.py | 2 - swarms/structs/agent.py | 47 +- swarms/structs/base_workflow.py | 55 +- swarms/structs/csv_to_agent.py | 2 +- swarms/structs/graph_swarm.py | 9 - swarms/structs/groupchat.py | 143 +++-- swarms/structs/majority_voting.py | 69 +- swarms/structs/mixture_of_agents.py | 66 +- swarms/structs/multi_agent_exec.py | 29 +- swarms/structs/multi_process_workflow.py | 244 ------- swarms/structs/rearrange.py | 29 +- swarms/structs/swarm_arange.py | 4 - swarms/structs/swarm_builder.py | 2 - swarms/structs/swarm_eval.py | 326 ++++++++++ swarms/structs/swarm_matcher.py | 2 - swarms/structs/swarm_router.py | 81 +-- swarms/telemetry/main.py | 5 +- swarms/utils/loguru_logger.py | 17 +- swarms/utils/wrapper_clusterop.py | 12 + tests/agent_evals/auto_test_eval.py | 6 - tests/agent_evals/github_summarizer_agent.py | 9 - tests/structs/test_multiprocess.py | 177 ----- 36 files changed, 2343 insertions(+), 1251 deletions(-) delete mode 100644 docs/swarms/structs/multi_process_workflow.md delete mode 100644 docs/swarms/structs/multi_processing_workflow.md create mode 100644 examples/chart_swarm.py create mode 100644 examples/morgtate_swarm.py create mode 100644 examples/reasoning_duo.py create mode 100644 examples/swarm_eval_deepseek.py create mode 100644 majority_voting_example.py create mode 100644 simple_example_ollama.py delete mode 100644 swarms/structs/multi_process_workflow.py create mode 100644 swarms/structs/swarm_eval.py delete mode 100644 tests/structs/test_multiprocess.py diff --git a/docs/swarms/models/models_available_overview.md b/docs/swarms/models/models_available_overview.md index 21ce54a7..c3c12127 100644 --- a/docs/swarms/models/models_available_overview.md +++ b/docs/swarms/models/models_available_overview.md @@ -278,8 +278,6 @@ Use logging to monitor the behavior and performance of your models. The `loguru` ```python from loguru import logger -logger.add("file.log", rotation="10 MB") - # Log model interactions logger.info("Running task on Anthropic model") response = model(task) diff --git a/docs/swarms/structs/majorityvoting.md b/docs/swarms/structs/majorityvoting.md index 84ac02c8..e44ccdd7 100644 --- a/docs/swarms/structs/majorityvoting.md +++ b/docs/swarms/structs/majorityvoting.md @@ -2,216 +2,216 @@ The `MajorityVoting` module provides a mechanism for performing majority voting among a group of agents. Majority voting is a decision rule that selects the option which has the majority of votes. This is particularly useful in systems where multiple agents provide responses to a query, and the most common response needs to be identified as the final output. +## Architecture + +```mermaid +graph TD + A[MajorityVoting System] --> B[Initialize Agents] + B --> C[Process Task] + C --> D{Execution Mode} + D --> E[Single Task] + D --> F[Batch Tasks] + D --> G[Concurrent Tasks] + D --> H[Async Tasks] + E --> I[Run Agents] + F --> I + G --> I + H --> I + I --> J[Collect Responses] + J --> K[Consensus Analysis] + K --> L{Consensus Agent?} + L -->|Yes| M[Use Consensus Agent] + L -->|No| N[Use Last Agent] + M --> O[Final Output] + N --> O + O --> P[Save Conversation] +``` + ### Key Concepts - **Majority Voting**: A method to determine the most common response from a set of answers. - **Agents**: Entities (e.g., models, algorithms) that provide responses to tasks or queries. - **Output Parser**: A function that processes the responses from the agents before performing the majority voting. +- **Consensus Agent**: An optional agent that analyzes the responses from all agents to determine the final consensus. +- **Conversation History**: A record of all agent interactions and responses during the voting process. -## Function Definitions - -### Function: `majority_voting` - -Performs majority voting on a list of answers and returns the most common answer. +## Class Definition: `MajorityVoting` -#### Parameters +### Parameters -| Parameter | Type | Description | -|-----------|----------|------------------------------| -| `answers` | `List[str]` | A list of answers from different agents. | +| Parameter | Type | Description | +|------------------|----------------|-----------------------------------------------------------------------------| +| `name` | `str` | Name of the majority voting system. Default is "MajorityVoting". | +| `description` | `str` | Description of the system. Default is "A majority voting system for agents". | +| `agents` | `List[Agent]` | A list of agents to be used in the majority voting system. | +| `output_parser` | `Callable` | Function to parse agent outputs. Default is `majority_voting` function. | +| `consensus_agent`| `Agent` | Optional agent for analyzing consensus among responses. | +| `autosave` | `bool` | Whether to autosave conversations. Default is `False`. | +| `verbose` | `bool` | Whether to enable verbose logging. Default is `False`. | +| `max_loops` | `int` | Maximum number of voting loops. Default is 1. | -#### Returns +### Methods -| Return Value | Type | Description | -|--------------|-------|----------------------------------------| -| `answer` | `str` | The most common answer in the list. If the list is empty, returns "I don't know". | +#### `run(task: str, correct_answer: str, *args, **kwargs) -> List[Any]` -## Class Definitions +Runs the majority voting system for a single task. -### Class: `MajorityVoting` +**Parameters:** +- `task` (str): The task to be performed by the agents +- `correct_answer` (str): The correct answer for evaluation +- `*args`, `**kwargs`: Additional arguments -Class representing a majority voting system for agents. +**Returns:** +- List[Any]: The conversation history as a string, including the majority vote -#### Parameters +#### `batch_run(tasks: List[str], *args, **kwargs) -> List[Any]` -| Parameter | Type | Description | -|------------------|--------------|-----------------------------------------------------------------------------| -| `agents` | `List[Agent]`| A list of agents to be used in the majority voting system. | -| `output_parser` | `Callable` | A function used to parse the output of the agents. If not provided, the default `majority_voting` function is used. | -| `autosave` | `bool` | A boolean indicating whether to autosave the conversation to a file. Default is `False`. | -| `verbose` | `bool` | A boolean indicating whether to enable verbose logging. Default is `False`. | +Runs multiple tasks in sequence. -### Method: `__init__` +**Parameters:** +- `tasks` (List[str]): List of tasks to be performed +- `*args`, `**kwargs`: Additional arguments -Initializes the `MajorityVoting` system. +**Returns:** +- List[Any]: List of majority votes for each task -#### Parameters +#### `run_concurrently(tasks: List[str], *args, **kwargs) -> List[Any]` -| Parameter | Type | Description | -|------------------|----------------|-----------------------------------------------------------------------------| -| `agents` | `List[Agent]` | A list of agents to be used in the majority voting system. | -| `output_parser` | `Callable` | A function used to parse the output of the agents. Default is the `majority_voting` function. | -| `autosave` | `bool` | A boolean indicating whether to autosave the conversation to a file. Default is `False`. | -| `verbose` | `bool` | A boolean indicating whether to enable verbose logging. Default is `False`. | -| `args` | `tuple` | Additional positional arguments. | -| `kwargs` | `dict` | Additional keyword arguments. | +Runs multiple tasks concurrently using thread pooling. -### Method: `run` +**Parameters:** +- `tasks` (List[str]): List of tasks to be performed +- `*args`, `**kwargs`: Additional arguments -Runs the majority voting system and returns the majority vote. +**Returns:** +- List[Any]: List of majority votes for each task -#### Parameters +#### `run_async(tasks: List[str], *args, **kwargs) -> List[Any]` -| Parameter | Type | Description | -|-----------|------------|------------------------------------------| -| `task` | `str` | The task to be performed by the agents. | -| `args` | `tuple` | Variable length argument list. | -| `kwargs` | `dict` | Arbitrary keyword arguments. | +Runs multiple tasks asynchronously using asyncio. -#### Returns +**Parameters:** +- `tasks` (List[str]): List of tasks to be performed +- `*args`, `**kwargs`: Additional arguments -| Return Value | Type | Description | -|--------------|-----------|--------------------------------------| -| `results` | `List[Any]` | The majority vote. | +**Returns:** +- List[Any]: List of majority votes for each task ## Usage Examples -### Example 1: Basic Majority Voting +### Example 1: Basic Single Task Execution with Modern LLMs ```python -from swarms.structs.agent import Agent -from swarms.structs.majority_voting import MajorityVoting +from swarms import Agent, MajorityVoting -# Initialize agents +# Initialize multiple agents with different specialties agents = [ Agent( - agent_name="Devin", - system_prompt=( - "Autonomous agent that can interact with humans and other" - " agents. Be Helpful and Kind. Use the tools provided to" - " assist the user. Return all code in markdown format." - ), - llm=llm, - max_loops="auto", - autosave=True, - dashboard=False, - streaming_on=True, - verbose=True, - stopping_token="", - interactive=True, - tools=[terminal, browser, file_editor, create_file], - code_interpreter=True, + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor focused on market analysis", + system_prompt="You are a financial advisor specializing in market analysis and investment opportunities.", + max_loops=1, + model_name="gpt-4o" ), Agent( - agent_name="Codex", - system_prompt=( - "An AI coding assistant capable of writing and understanding" - " code snippets in various programming languages." - ), - llm=llm, - max_loops="auto", - autosave=True, - dashboard=False, - streaming_on=True, - verbose=True, - stopping_token="", - interactive=True, - tools=[terminal, browser, file_editor, create_file], - code_interpreter=True, + agent_name="Risk-Assessment-Agent", + agent_description="Risk analysis and portfolio management expert", + system_prompt="You are a risk assessment expert focused on evaluating investment risks and portfolio diversification.", + max_loops=1, + model_name="gpt-4o" ), Agent( - agent_name="Tabnine", - system_prompt=( - "A code completion AI that provides suggestions for code" - " completion and code improvements." - ), - llm=llm, - max_loops="auto", - autosave=True, - dashboard=False, - streaming_on=True, - verbose=True, - stopping_token="", - interactive=True, - tools=[terminal, browser, file_editor, create_file], - code_interpreter=True, - ), + agent_name="Tech-Investment-Agent", + agent_description="Technology sector investment specialist", + system_prompt="You are a technology investment specialist focused on AI, emerging tech, and growth opportunities.", + max_loops=1, + model_name="gpt-4o" + ) ] -# Create MajorityVoting instance -majority_voting = MajorityVoting(agents) -# Run the majority voting system -result = majority_voting.run("What is the capital of France?") -print(result) # Output: 'Paris' +consensus_agent = Agent( + agent_name="Consensus-Agent", + agent_description="Consensus agent focused on analyzing investment advice", + system_prompt="You are a consensus agent focused on analyzing investment advice and providing a final answer.", + max_loops=1, + model_name="gpt-4o" +) + +# Create majority voting system +majority_voting = MajorityVoting( + name="Investment-Advisory-System", + description="Multi-agent system for investment advice", + agents=agents, + verbose=True, + consensus_agent=consensus_agent +) + +# Run the analysis with majority voting +result = majority_voting.run( + task="Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", + correct_answer="" # Optional evaluation metric +) + +print(result) + ``` -### Example 2: Running a Task with Detailed Outputs +## Batch Execution ```python -from swarms.structs.agent import Agent -from swarms.structs.majority_voting import MajorityVoting +from swarms import Agent, MajorityVoting -# Initialize agents +# Initialize multiple agents with different specialties agents = [ Agent( - agent_name="Devin", - system_prompt=( - "Autonomous agent that can interact with humans and other" - " agents. Be Helpful and Kind. Use the tools provided to" - " assist the user. Return all code in markdown format." - ), - llm=llm, - max_loops="auto", - autosave=True, - dashboard=False, - streaming_on=True, - verbose=True, - stopping_token="", - interactive=True, - tools=[terminal, browser, file_editor, create_file], - code_interpreter=True, + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor focused on market analysis", + system_prompt="You are a financial advisor specializing in market analysis and investment opportunities.", + max_loops=1, + model_name="gpt-4o" ), Agent( - agent_name="Codex", - system_prompt=( - "An AI coding assistant capable of writing and understanding" - " code snippets in various programming languages." - ), - llm=llm, - max_loops="auto", - autosave=True, - dashboard=False, - streaming_on=True, - verbose=True, - stopping_token="", - interactive=True, - tools=[terminal, browser, file_editor, create_file], - code_interpreter=True, + agent_name="Risk-Assessment-Agent", + agent_description="Risk analysis and portfolio management expert", + system_prompt="You are a risk assessment expert focused on evaluating investment risks and portfolio diversification.", + max_loops=1, + model_name="gpt-4o" ), Agent( - agent_name="Tabnine", - system_prompt=( - "A code completion AI that provides suggestions for code" - " completion and code improvements." - ), - llm=llm, - max_loops="auto", - autosave=True, - dashboard=False, - streaming_on=True, - verbose=True, - stopping_token="", - interactive=True, - tools=[terminal, browser, file_editor, create_file], - code_interpreter=True, - ), + agent_name="Tech-Investment-Agent", + agent_description="Technology sector investment specialist", + system_prompt="You are a technology investment specialist focused on AI, emerging tech, and growth opportunities.", + max_loops=1, + model_name="gpt-4o" + ) ] -# Create MajorityVoting instance -majority_voting = MajorityVoting(agents) -# Run the majority voting system with a different task -result = majority_voting.run("Create a new file for a plan to take over the world.") +consensus_agent = Agent( + agent_name="Consensus-Agent", + agent_description="Consensus agent focused on analyzing investment advice", + system_prompt="You are a consensus agent focused on analyzing investment advice and providing a final answer.", + max_loops=1, + model_name="gpt-4o" +) + +# Create majority voting system +majority_voting = MajorityVoting( + name="Investment-Advisory-System", + description="Multi-agent system for investment advice", + agents=agents, + verbose=True, + consensus_agent=consensus_agent +) + +# Run the analysis with majority voting +result = majority_voting.batch_run( + task="Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", + correct_answer="" # Optional evaluation metric +) + print(result) + + ``` \ No newline at end of file diff --git a/docs/swarms/structs/moa.md b/docs/swarms/structs/moa.md index 82b23330..0a1af8bc 100644 --- a/docs/swarms/structs/moa.md +++ b/docs/swarms/structs/moa.md @@ -1,5 +1,31 @@ # MixtureOfAgents Class Documentation +## Architecture Overview + +```mermaid +graph TD + A[Input Task] --> B[Initialize MixtureOfAgents] + B --> C[Reliability Check] + C --> D[Layer 1: Parallel Agent Execution] + D --> E[Layer 2: Sequential Processing] + E --> F[Layer 3: Parallel Agent Execution] + F --> G[Final Aggregator Agent] + G --> H[Output Response] + + subgraph "Agent Layer Details" + I[Agent 1] --> J[Agent Results] + K[Agent 2] --> J + L[Agent N] --> J + end + + subgraph "Processing Flow" + M[Previous Context] --> N[Current Task] + N --> O[Agent Processing] + O --> P[Aggregation] + P --> M + end +``` + ## Overview The `MixtureOfAgents` class represents a mixture of agents operating within a swarm. The workflow of the swarm follows a parallel → sequential → parallel → final output agent process. This implementation is inspired by concepts discussed in the paper: [https://arxiv.org/pdf/2406.04692](https://arxiv.org/pdf/2406.04692). @@ -130,6 +156,89 @@ history = moe_swarm.run(task="Solve this problem.") print(history) ``` +### `reliability_check` + +```python +def reliability_check(self) -> None: +``` + +#### Description + +Performs validation checks on the Mixture of Agents class to ensure all required components are properly configured. Raises ValueError if any checks fail. + +#### Validation Checks: +- Verifies reference agents are provided +- Validates aggregator agent exists +- Checks aggregator system prompt is set +- Ensures layers count is valid (> 0) + +### `_get_final_system_prompt` + +```python +def _get_final_system_prompt(self, system_prompt: str, results: List[str]) -> str: +``` + +#### Description + +Internal method that constructs a system prompt for subsequent layers by incorporating previous responses. + +#### Parameters + +| Parameter | Type | Description | +|-----------|------|-------------| +| `system_prompt` | `str` | The initial system prompt | +| `results` | `List[str]` | List of previous responses | + +#### Returns + +| Type | Description | +|------|-------------| +| `str` | Combined system prompt with previous responses | + +### `run_batched` + +```python +def run_batched(self, tasks: List[str]) -> List[str]: +``` + +#### Description + +Processes multiple tasks sequentially, returning a list of responses. + +#### Parameters + +| Parameter | Type | Description | +|-----------|------|-------------| +| `tasks` | `List[str]` | List of tasks to process | + +#### Returns + +| Type | Description | +|------|-------------| +| `List[str]` | List of responses for each task | + +### `run_concurrently` + +```python +def run_concurrently(self, tasks: List[str]) -> List[str]: +``` + +#### Description + +Processes multiple tasks concurrently using a ThreadPoolExecutor, optimizing for parallel execution. + +#### Parameters + +| Parameter | Type | Description | +|-----------|------|-------------| +| `tasks` | `List[str]` | List of tasks to process concurrently | + +#### Returns + +| Type | Description | +|------|-------------| +| `List[str]` | List of responses for each task | + ## Detailed Explanation ### Initialization @@ -382,4 +491,113 @@ The `MixtureOfAgents` framework provides a solid foundation for further extensio - **Advanced Agent Communication**: Enhancing the communication protocols between agents to allow for more sophisticated information exchange. - **Integration with Other Frameworks**: Seamlessly integrating with other machine learning or data processing frameworks to leverage their capabilities within the swarm architecture. -In conclusion, the `MixtureOfAgents` class represents a versatile and efficient solution for orchestrating multi-agent systems, facilitating complex task execution through its structured and layered approach. By harnessing the power of parallel and sequential processing, it opens up new possibilities for tackling intricate problems across various domains. \ No newline at end of file +In conclusion, the `MixtureOfAgents` class represents a versatile and efficient solution for orchestrating multi-agent systems, facilitating complex task execution through its structured and layered approach. By harnessing the power of parallel and sequential processing, it opens up new possibilities for tackling intricate problems across various domains. + +## Additional Examples + +### Example 4: Batch Processing + +```python +from swarms import MixtureOfAgents, Agent +from swarm_models import OpenAIChat + +# Initialize agents as in previous examples +director = Agent( + agent_name="Director", + system_prompt="Directs the tasks for the accountants", + llm=OpenAIChat(), + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="director.json", +) + +accountant1 = Agent( + agent_name="Accountant1", + system_prompt="Prepares financial statements", + llm=OpenAIChat(), + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="accountant1.json", +) + +accountant2 = Agent( + agent_name="Accountant2", + system_prompt="Audits financial records", + llm=OpenAIChat(), + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="accountant2.json", +) + +# Initialize MixtureOfAgents +moe_swarm = MixtureOfAgents( + agents=[director, accountant1, accountant2], + final_agent=director +) + +# Process multiple tasks in batch +tasks = [ + "Analyze Q1 financial statements", + "Review tax compliance", + "Prepare budget forecast" +] +results = moe_swarm.run_batched(tasks) +for task, result in zip(tasks, results): + print(f"Task: {task}\nResult: {result}\n") +``` + +### Example 5: Concurrent Processing + +```python +from swarms import MixtureOfAgents, Agent +from swarm_models import OpenAIChat + +# Initialize agents as before +# ... agent initialization code ... + +# Initialize MixtureOfAgents +moe_swarm = MixtureOfAgents( + agents=[director, accountant1, accountant2], + final_agent=director +) + +# Process multiple tasks concurrently +tasks = [ + "Generate monthly report", + "Audit expense claims", + "Update financial projections", + "Review investment portfolio" +] +results = moe_swarm.run_concurrently(tasks) +for task, result in zip(tasks, results): + print(f"Task: {task}\nResult: {result}\n") +``` + +## Advanced Features + +### Context Preservation + +The `MixtureOfAgents` class maintains context between iterations when running multiple loops. Each subsequent iteration receives the context from previous runs, allowing for more sophisticated and context-aware processing. + +### Asynchronous Processing + +The class implements asynchronous processing internally using Python's `asyncio`, enabling efficient handling of concurrent operations and improved performance for complex workflows. + +### Telemetry and Logging + +Built-in telemetry and logging capabilities help track agent performance and maintain detailed execution records: +- Automatic logging of agent outputs +- Structured data capture using Pydantic models +- JSON-formatted output options \ No newline at end of file diff --git a/docs/swarms/structs/multi_process_workflow.md b/docs/swarms/structs/multi_process_workflow.md deleted file mode 100644 index d89134d6..00000000 --- a/docs/swarms/structs/multi_process_workflow.md +++ /dev/null @@ -1,124 +0,0 @@ -# MultiProcessWorkflow Documentation - - -The `MultiProcessWorkflow` class provides a framework for executing tasks concurrently using multiple processes. This class leverages Python's `multiprocessing` module to parallelize task execution, thereby enhancing performance and efficiency. It includes features such as automatic task retry on failure and optional autosaving of results. This documentation details the class, its parameters, attributes, methods, and usage examples. - -## Class Definition - -### `MultiProcessWorkflow` - - -## Parameters - -| Parameter | Type | Default | Description | -|---------------|---------------------|---------|---------------------------------------------------------------| -| `max_workers` | `int` | `5` | The maximum number of workers to use for parallel processing. | -| `autosave` | `bool` | `True` | Flag indicating whether to automatically save the workflow. | -| `agents` | `Sequence[Agent]` | `None` | A list of Agent objects representing the workflow agents. | -| `*args` | `tuple` | | Additional positional arguments. | -| `**kwargs` | `dict` | | Additional keyword arguments. | - -## Attributes - -| Attribute | Type | Description | -|-----------------|---------------------|--------------------------------------------------------------| -| `max_workers` | `int` | The maximum number of workers to use for parallel processing.| -| `autosave` | `bool` | Flag indicating whether to automatically save the workflow. | -| `agents` | `Sequence[Agent]` | A list of Agent objects representing the workflow agents. | - -## Methods - -### `execute_task` - -#### Description - -The `execute_task` method executes a given task and handles any exceptions that may occur during execution. If agents are defined, it will execute the task using each agent in sequence. - -#### Usage Example - -```python -# Define a task -task = Task() - -# Execute the task -workflow = MultiProcessWorkflow() -result = workflow.execute_task(task) -print(result) -``` - -### `run` - -#### Description - -The `run` method executes the workflow by running the given task using multiple processes. It manages the task execution using a process pool and collects the results. - -#### Usage Example - -```python -from swarms.structs.multi_process_workflow import MultiProcessingWorkflow -from swarms.structs.task import Task -from datetime import datetime -from time import sleep - -# Define a simple task -def simple_task(): - sleep(1) - return datetime.now() - -# Create a task object -task = Task( - name="Simple Task", - execute=simple_task, - priority=1, -) - -# Create a workflow with the task -workflow = MultiProcessWorkflow(max_workers=3, autosave=True, agents=[agent1, agent2]) - -# Run the workflow -results = workflow.run(task) - -# Print the results -print(results) -``` - -## Detailed Functionality and Usage - -### Initialization - -When an instance of `MultiProcessWorkflow` is created, it initializes the following: - -- **max_workers**: Sets the maximum number of processes that can run concurrently. -- **autosave**: Determines if the workflow results should be saved automatically. -- **agents**: Accepts a list of agents that will perform the tasks. - -### Running Tasks - -The `run` method performs the following steps: - -1. **Initialize Results and Manager**: Creates a list to store results and a `Manager` to manage shared state between processes. -2. **Initialize Process Pool**: Creates a pool of worker processes. -3. **Submit Tasks**: Iterates over the agents, submitting tasks to the pool for execution and collecting the results. -4. **Wait for Completion**: Waits for all tasks to complete and collects the results. -5. **Return Results**: Returns the list of results from all executed tasks. - -### Autosave Task Result - -Although the autosave functionality is mentioned in the parameters, it is not explicitly defined in the given code. The implementation for autosaving should be added based on the specific requirements of the application. - -## Additional Information and Tips - -- **Process Safety**: The use of `Manager` ensures that the list of results is managed safely across multiple processes. -- **Logging**: The class uses the `logger` module to log information about task execution, retries, and failures. -- **Error Handling**: The retry mechanism in the `execute_task` method helps in handling transient errors by attempting to re-execute failed tasks. - -## References and Resources - -For more information on multiprocessing in Python, refer to the following resources: - -- [Python Multiprocessing Documentation](https://docs.python.org/3/library/multiprocessing.html) -- [Python Logging Documentation](https://docs.python.org/3/library/logging.html) - ---- - -By following this detailed documentation, users can effectively understand and utilize the `MultiProcessWorkflow` class to execute tasks concurrently with multiple processes. The examples provided help in demonstrating the practical usage of the class. \ No newline at end of file diff --git a/docs/swarms/structs/multi_processing_workflow.md b/docs/swarms/structs/multi_processing_workflow.md deleted file mode 100644 index 320667d4..00000000 --- a/docs/swarms/structs/multi_processing_workflow.md +++ /dev/null @@ -1,204 +0,0 @@ -# MultiProcessWorkflow Documentation - -The `MultiProcessWorkflow` class extends the `BaseWorkflow` to support parallel processing using multiple workers. This class is designed to efficiently execute tasks concurrently, leveraging the power of multi-processing to enhance performance and scalability. - -### Key Concepts - -- **Parallel Processing**: Utilizing multiple workers to execute tasks concurrently. -- **Workflow Management**: Handling the execution of tasks in a structured workflow. -- **Agents**: Entities responsible for executing tasks. - -## Attributes - -### Arguments - -| Argument | Type | Default | Description | -|--------------|---------------------|---------|-------------| -| `max_workers`| `int` | `5` | The maximum number of workers to use for parallel processing. | -| `autosave` | `bool` | `True` | Flag indicating whether to automatically save the workflow. | -| `agents` | `Sequence[Agent]` | `None` | A list of agents participating in the workflow. | -| `*args` | | | Additional positional arguments. | -| `**kwargs` | | | Additional keyword arguments. | - -### Attributes - -| Attribute | Type | Description | -|--------------|---------------------|-------------| -| `max_workers`| `int` | The maximum number of workers to use for parallel processing. | -| `autosave` | `bool` | Flag indicating whether to automatically save the workflow. | -| `agents` | `Sequence[Agent]` | A list of agents participating in the workflow. | - -## Methods - -### __init__ - -Initializes the `MultiProcessWorkflow` with the given parameters. - -**Examples:** - -```python -from swarms.structs.agent import Agent -from swarms.structs.task import Task -from swarms.structs.multi_process_workflow import MultiProcessWorkflow - -agents = [Agent(name="Agent 1"), Agent(name="Agent 2")] -tasks = [Task(name="Task 1", execute=lambda: "result1"), Task(name="Task 2", execute=lambda: "result2")] - -workflow = MultiProcessWorkflow(max_workers=3, agents=agents, tasks=tasks) -``` - -### execute_task - -Executes a task and handles exceptions. - -**Arguments:** - -| Parameter | Type | Description | -|-----------|------|-------------| -| `task` | `str` | The task to execute. | -| `*args` | | Additional positional arguments for the task execution. | -| `**kwargs`| | Additional keyword arguments for the task execution. | - -**Returns:** - -| Return Type | Description | -|-------------|-------------| -| `Any` | The result of the task execution. | - -**Examples:** - -```python -result = workflow.execute_task(task="Sample Task") -print(result) -``` - -### run - -Runs the workflow. - -**Arguments:** - -| Parameter | Type | Description | -|-----------|------|-------------| -| `task` | `str` | The task to run. | -| `*args` | | Additional positional arguments for the task execution. | -| `**kwargs`| | Additional keyword arguments for the task execution. | - -**Returns:** - -| Return Type | Description | -|-------------|-------------| -| `List[Any]` | The results of all executed tasks. | - -**Examples:** - -```python -results = workflow.run(task="Sample Task") -print(results) -``` - -### Additional Examples - -#### Example 1: Simple Task Execution - -```python -from swarms import Agent, Task, MultiProcessWorkflow, OpenAIChat -from datetime import datetime -from time import sleep - -import os -from dotenv import load_dotenv - -# Load the environment variables -load_dotenv() - - -# Define a function to be used as the action -def my_action(): - print("Action executed") - - -# Define a function to be used as the condition -def my_condition(): - print("Condition checked") - return True - - -# Create an agent -agent = Agent( - llm=OpenAIChat(openai_api_key=os.environ["OPENAI_API_KEY"]), - max_loops=1, - dashboard=False, -) - -# Create a task -task = Task( - description=( - "Generate a report on the top 3 biggest expenses for small" - " businesses and how businesses can save 20%" - ), - agent=agent, -) - -# Create a workflow with the task -workflow = MultiProcessWorkflow(tasks=[task]) - -# Run the workflow -results = workflow.run(task) -print(results) -``` - -#### Example 2: Workflow with Multiple Agents - -```python -from swarms import Agent, Task, MultiProcessWorkflow - -# Define tasks -def task1(): - return "Task 1 result" - -def task2(): - return "Task 2 result" - -# Create agents -agent1 = Agent(name="Agent 1", llm=OpenAIChat()) -agent2 = Agent(name="Agent 2", llm=OpenAIChat()) - -# Create tasks -task_1 = Task(name="Task 1", execute=task1) -task_2 = Task(name="Task 2", execute=task2) - -# Create a workflow -workflow = MultiProcessWorkflow(agents=[agent1, agent2], tasks=[task_1, task_2]) - -# Run the workflow -results = workflow.run(task="Example Task") -print(results) -``` - -#### Example 3: Customizing Max Workers - -```python -from swarms import Agent, Task, MultiProcessWorkflow, OpenAIChat - -# Define a task -def example_task(): - return "Task result" - -# Create an agent -agent = Agent(name="Agent 1", llm=OpenAIChat()) - -# Create a task -task = Task(name="Example Task", execute=example_task) - -# Create a workflow with custom max workers -workflow = MultiProcessWorkflow(max_workers=10, agents=[agent], tasks=[task]) - -# Run the workflow -results = workflow.run(task="Example Task") -print(results) -``` - -## Summary - -The `MultiProcessWorkflow` class provides a powerful framework for managing and executing tasks using multiple workers. With support for parallel processing, customizable workflows, and detailed logging, it is an ideal tool for complex task execution scenarios. This class enhances performance and scalability, making it suitable for a wide range of applications that require efficient task management. \ No newline at end of file diff --git a/example.py b/example.py index db042f11..b4002101 100644 --- a/example.py +++ b/example.py @@ -28,5 +28,4 @@ agent = Agent( agent.run( "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", - all_cores=True, ) diff --git a/examples/chart_swarm.py b/examples/chart_swarm.py new file mode 100644 index 00000000..ad1025e1 --- /dev/null +++ b/examples/chart_swarm.py @@ -0,0 +1,388 @@ +import os +from dataclasses import dataclass +from typing import Tuple +from litellm import completion +from loguru import logger +from swarms import Agent + +EXTRACTION_PROMPT = """ +You are a specialized Chart2Table extraction agent that converts visual charts into precise textual descriptions. + +Output Format: +[Chart Type] +Type: {bar|line|pie|scatter|combination} +Title: {chart title} +X-Axis: {label and scale} +Y-Axis: {label and scale} + +[Data Series] +Name: {series name} +Values: {comma-separated list of values} +{repeat for each series} + +[Annotations] +- {list any markers, gridlines, legends} +- {note any data gaps or anomalies} + +Guidelines: +1. Maintain exact numerical precision +2. List ALL data points in order +3. Note any gaps, outliers or special patterns +4. Describe axes scales (linear/log) and units +5. Include legends and series names verbatim +6. Note any data point annotations or markers +7. Describe chart elements spatially (top-left, center, etc) +8. Include color and style information if relevant +9. Note relationships between multiple series +10. Flag any data quality or readability issues""" + +REFORMULATION_PROMPT = """You are an Answer Reformulation specialist that breaks down complex analytical statements into atomic, verifiable claims. + +Output Format: +[Core Claims] +1. {single fact with exact numbers} +2. {another atomic fact} +{continue for all core claims} + +[Supporting Context] +1. {relevant context that supports core claims} +2. {additional contextual information} +{continue for all context} + +[Assumptions] +1. {implicit assumption made} +2. {another assumption} +{continue for all assumptions} + +Guidelines: +1. Each claim must be independently verifiable +2. Use exact numbers, never round or approximate +3. Split compound statements into atomic facts +4. Make implicit comparisons explicit +5. Note temporal relationships clearly +6. Include units with all measurements +7. Flag any uncertainty or approximations +8. Note data source limitations +9. Preserve calculation steps +10. Maintain logical dependencies""" + +CAPTIONING_PROMPT = """You are an Entity Captioning specialist that generates rich contextual descriptions of chart elements. + +Output Format: +[Data Points] +{x,y}: {detailed description of point significance} +{continue for key points} + +[Trends] +- {description of overall pattern} +- {notable sub-patterns} +{continue for all trends} + +[Relationships] +- {correlation between variables} +- {causation if evident} +{continue for all relationships} + +[Context] +- {broader context for interpretation} +- {relevant external factors} +{continue for all context} + +Guidelines: +1. Describe both local and global patterns +2. Note statistical significance of changes +3. Identify cyclic or seasonal patterns +4. Flag outliers and anomalies +5. Compare relative magnitudes +6. Note rate of change patterns +7. Describe distribution characteristics +8. Highlight key inflection points +9. Note data clustering patterns +10. Include domain-specific insights""" + +PREFILTER_PROMPT = """You are a Pre-filtering specialist that identifies relevant chart elements for verification. + +Output Format: +[Critical Elements] +1. {element}: Score {0-10} + Evidence: {why this supports claims} +{continue for all relevant elements} + +[Supporting Elements] +1. {element}: Score {0-10} + Context: {how this adds context} +{continue for all supporting elements} + +[Relevance Chain] +1. {claim} -> {element} -> {evidence} +{continue for all connections} + +Guidelines: +1. Score relevance 0-10 with detailed rationale +2. Build explicit evidence chains +3. Note both direct and indirect support +4. Consider temporal relevance +5. Account for data relationships +6. Note confidence levels +7. Include contextual importance +8. Consider alternative interpretations +9. Note missing evidence +10. Explain filtering decisions""" + +RERANK_PROMPT = """You are a Re-ranking specialist that orders evidence by strength and relevance. + +Output Format: +[Primary Evidence] +1. {element} - Score: {0-10} + Strength: {detailed justification} +{continue for top evidence} + +[Supporting Evidence] +1. {element} - Score: {0-10} + Context: {how this reinforces primary evidence} +{continue for supporting evidence} + +[Evidence Chains] +1. {claim} -> {primary} -> {supporting} -> {conclusion} +{continue for all chains} + +Guidelines: +1. Use explicit scoring criteria +2. Consider evidence independence +3. Note corroborating elements +4. Account for evidence quality +5. Consider contradictory evidence +6. Note confidence levels +7. Explain ranking decisions +8. Build complete evidence chains +9. Note gaps in evidence +10. Consider alternative interpretations""" + +LOCALIZATION_PROMPT = """You are a Cell Localization specialist that precisely maps data to visual elements. + +Output Format: +[Element Locations] +1. Type: {bar|line|point|label} + Position: {x1,y1,x2,y2} + Value: {associated data value} + Confidence: {0-10} +{continue for all elements} + +[Spatial Relationships] +- {relative positions} +- {alignment patterns} +{continue for all relationships} + +[Visual Context] +- {surrounding elements} +- {reference points} +{continue for context} + +Guidelines: +1. Use normalized coordinates (0-1) +2. Note element boundaries precisely +3. Include confidence scores +4. Note spatial relationships +5. Account for overlapping elements +6. Consider chart type constraints +7. Note alignment patterns +8. Include reference points +9. Note visual hierarchies +10. Document occlusions""" + + +@dataclass +class ChartElement: + element_type: str + bbox: Tuple[float, float, float, float] + confidence: float + + +class VisionAPI: + def __init__( + self, + model_name: str = "gpt-4o", + max_tokens: int = 1000, + temperature: float = 0.5, + ): + os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") + self.model_name = model_name + self.max_tokens = max_tokens + self.temperature = temperature + + def encode_image(self, img: str): + if img.startswith("http"): + return img + import base64 + + with open(img, "rb") as image_file: + encoded_string = base64.b64encode( + image_file.read() + ).decode("utf-8") + return f"data:image/png;base64,{encoded_string}" + + def run(self, task: str, img: str): + img = self.encode_image(img) + response = completion( + model=self.model_name, + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": task}, + { + "type": "image_url", + "image_url": {"url": img}, + }, + ], + } + ], + max_tokens=self.max_tokens, + temperature=self.temperature, + ) + return response.choices[0].message.content + + +class ChartCitor: + def __init__( + self, + model_name: str = "gpt-4o", + saved_state_path: str = "chartcitor_state.json", + max_retries: int = 3, + max_loops: int = 1, + ): + logger.info( + f"Initializing ChartCitor with model {model_name}" + ) + model = VisionAPI() + + self.extraction_agent = Agent( + agent_name="Chart2Table-Agent", + system_prompt=EXTRACTION_PROMPT, + llm=model, + max_loops=1, + ) + + self.reformulation_agent = Agent( + agent_name="Answer-Reformulation-Agent", + system_prompt=REFORMULATION_PROMPT, + llm=model, + max_loops=1, + ) + + self.captioning_agent = Agent( + agent_name="Entity-Captioning-Agent", + system_prompt=CAPTIONING_PROMPT, + llm=model, + max_loops=1, + ) + + self.prefilter_agent = Agent( + agent_name="LLM-Prefilter-Agent", + system_prompt=PREFILTER_PROMPT, + llm=model, + max_loops=1, + ) + + self.rerank_agent = Agent( + agent_name="LLM-Rerank-Agent", + system_prompt=RERANK_PROMPT, + llm=model, + max_loops=1, + ) + + self.localization_agent = Agent( + agent_name="Cell-Localization-Agent", + system_prompt=LOCALIZATION_PROMPT, + llm=model, + max_loops=1, + ) + + def extract_table(self, chart_image: str) -> str: + logger.info("Extracting table from chart") + return self.extraction_agent.run( + "Extract and describe the data from this chart following the specified format.", + img=chart_image, + ) + + def reformulate_answer( + self, answer: str, table_data: str, chart_image: str + ) -> str: + logger.info("Reformulating answer into atomic facts") + return self.reformulation_agent.run( + f"Break this answer into atomic facts:\n{answer}\n\nTable data:\n{table_data}", + img=chart_image, + ) + + def generate_captions( + self, table_data: str, chart_image: str + ) -> str: + logger.info("Generating captions for chart elements") + return self.captioning_agent.run( + f"Generate descriptive captions for this data:\n{table_data}", + img=chart_image, + ) + + def retrieve_evidence( + self, + facts: str, + table_data: str, + captions: str, + chart_image: str, + ) -> str: + logger.info("Retrieving supporting evidence") + filtered = self.prefilter_agent.run( + f"Identify relevant elements for:\nFacts:\n{facts}\n\nData:\n{table_data}\n\nCaptions:\n{captions}", + img=chart_image, + ) + + return self.rerank_agent.run( + f"Rank these elements by relevance:\n{filtered}\nFor facts:\n{facts}", + img=chart_image, + ) + + def localize_elements( + self, chart_image: str, evidence: str + ) -> str: + logger.info("Localizing chart elements") + return self.localization_agent.run( + f"Describe the location of these elements:\n{evidence}", + img=chart_image, + ) + + def run( + self, chart_image: str, question: str, answer: str + ) -> str: + logger.info(f"Processing question: {question}") + + table_data = self.extract_table(chart_image) + facts = self.reformulate_answer( + answer, table_data, chart_image + ) + captions = self.generate_captions(table_data, chart_image) + evidence = self.retrieve_evidence( + facts, table_data, captions, chart_image + ) + citations = self.localize_elements(chart_image, evidence) + + return f"""Analysis Results: + + Facts: + {facts} + + Evidence: + {evidence} + + Visual Citations: + {citations} + """ + + +if __name__ == "__main__": + chartcitor = ChartCitor() + result = chartcitor.run( + chart_image="chart.png", + question="Analyze this chart of solana price and volume over time. What is the highest volume day?", + answer="203", + ) + print(result) diff --git a/examples/morgtate_swarm.py b/examples/morgtate_swarm.py new file mode 100644 index 00000000..e0fe6055 --- /dev/null +++ b/examples/morgtate_swarm.py @@ -0,0 +1,603 @@ +import concurrent.futures +import json +import os +import time +import uuid +from io import BytesIO +from typing import Dict, List, Union + +import PyPDF2 +from pydantic import BaseModel, Field +from reportlab.lib.pagesizes import LETTER +from reportlab.pdfgen import canvas + +from swarms import Agent + + +def user_id_generator(): + return str(uuid.uuid4().hex) + + +timestamp = time.strftime("%Y%m%d_%H%M%S") +# print(timestamp) + + +class MortgageApplicationInput(BaseModel): + user_id: str = Field(default_factory=user_id_generator) + timestamp: str = Field(default_factory=timestamp) + application_data: str = Field( + description="The raw text of the mortgage application." + ) + + +class MortgageApplicationOutput(BaseModel): + user_id: str = Field(default_factory=user_id_generator) + input_data: MortgageApplicationInput = Field( + description="The input data for the mortgage application." + ) + document_analysis: str = Field( + description="The structured analysis of the mortgage application." + ) + risk_evaluation: str = Field( + description="The risk evaluation of the mortgage application." + ) + underwriting_decision: str = Field( + description="The underwriting decision of the mortgage application." + ) + + +def clean_markdown(text: str) -> str: + """ + Removes all markdown symbols from text. + + Args: + text (str): Text containing markdown symbols + + Returns: + str: Text with markdown symbols removed + """ + markdown_symbols = [ + "```markdown", + "```", + "#", + "*", + "_", + "`", + ">", + "-", + "+", + "[", + "]", + "(", + ")", + "|", + ] + cleaned_text = text + for symbol in markdown_symbols: + cleaned_text = cleaned_text.replace(symbol, "") + return cleaned_text.strip() + + +class MortgageUnderwritingSwarm: + def __init__( + self, + user_id: str = user_id_generator(), + save_directory: str = "./autosave", + return_format: str = "pdf", + ): + """ + Initialize the MortgageUnderwritingSwarm with the necessary Agents. + Args: + save_directory (str): Directory where intermediate results and final documents will be autosaved. + """ + self.user_id = user_id + self.save_directory = save_directory + self.return_format = return_format + os.makedirs(self.save_directory, exist_ok=True) + + # ------------------------------- + # 1) Document Analyzer Agent + # ------------------------------- + self.document_agent = Agent( + agent_name="Document-Analyzer-Agent", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ) + self.document_prompt = """ + You are a highly experienced Mortgage Document Analysis Expert with deep knowledge of federal and state mortgage regulations. Your task is to: + + 1. Parse and extract key data from unstructured documents (PDF or text) while ensuring compliance with: + - Truth in Lending Act (TILA) requirements + - Real Estate Settlement Procedures Act (RESPA) guidelines + - Fair Credit Reporting Act (FCRA) standards + - Equal Credit Opportunity Act (ECOA) requirements + + 2. Validate data consistency and regulatory compliance for: + - Income verification (including all sources of income) + - Credit scores and credit history + - Property details and appraisal information + - Debt obligations and payment history + - Employment verification + - Asset documentation + - Identity verification documents + + 3. Highlight any discrepancies, red flags, or potential compliance violations, including: + - Inconsistencies in reported income vs documentation + - Suspicious patterns in bank statements + - Potential identity theft indicators + - Missing required regulatory disclosures + - Fair lending concerns + - Anti-money laundering (AML) red flags + + 4. Provide a comprehensive, well-structured summary that includes: + - All key findings organized by category + - Compliance checklist results + - Documentation completeness assessment + - Regulatory disclosure verification + - Quality control notes + + 5. Clearly indicate any missing or ambiguous information required by: + - Federal regulations + - State-specific requirements + - Agency guidelines (FHA, VA, Fannie Mae, Freddie Mac) + - Internal compliance policies + + 6. Format output in a standardized structure that: + - Facilitates automated compliance checks + - Enables clear audit trails + - Supports regulatory reporting requirements + - Can be easily consumed by subsequent agents + """ + + # ------------------------------- + # 2) Risk Evaluator Agent + # ------------------------------- + self.risk_agent = Agent( + agent_name="Risk-Evaluator-Agent", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ) + self.risk_prompt = """ + You are an expert Risk Evaluator for mortgage applications with comprehensive knowledge of regulatory compliance. Your responsibilities: + + 1. Conduct thorough risk assessment in accordance with: + - Dodd-Frank Act requirements + - Consumer Financial Protection Bureau (CFPB) guidelines + - Federal Reserve Board regulations + - Agency-specific requirements (FHA, VA, Fannie Mae, Freddie Mac) + + 2. Evaluate key risk factors including: + - Debt-to-income ratio (DTI) compliance with QM rules + - Credit history analysis per FCRA guidelines + - Property valuation in line with USPAP standards + - Income stability and verification per agency requirements + - Assets and reserves adequacy + - Employment history and verification + - Occupancy risk assessment + - Property type and use restrictions + + 3. Calculate and assign risk scores: + - Overall application risk score (1-10 scale) + - Individual component risk scores + - Regulatory compliance risk assessment + - Fraud risk indicators + - Default risk probability + + 4. Identify and document: + - High-risk elements requiring additional scrutiny + - Potential regulatory compliance issues + - Required compensating factors + - Secondary market eligibility concerns + - Fair lending considerations + + 5. Recommend risk mitigation strategies: + - Additional documentation requirements + - Income/asset verification needs + - Compensating factor documentation + - Alternative qualification approaches + - Regulatory compliance remediation steps + + 6. Generate comprehensive risk analysis including: + - Detailed risk assessment findings + - Compliance verification results + - Supporting documentation requirements + - Clear justification for all conclusions + - Regulatory requirement adherence confirmation + """ + + # ------------------------------- + # 3) Mortgage Underwriter Agent + # ------------------------------- + self.underwriter_agent = Agent( + agent_name="Mortgage-Underwriter-Agent", + model_name="gpt-4o-mini", + max_loops=1, + streaming_on=True, + ) + self.underwriter_prompt = """ + You are a seasoned Mortgage Underwriter with expertise in regulatory compliance and industry standards. Your role is to: + + 1. Make final underwriting decisions while ensuring compliance with: + - Qualified Mortgage (QM) and Ability-to-Repay (ATR) rules + - Fair lending laws (ECOA, FHA, HMDA) + - Agency guidelines (FHA, VA, Fannie Mae, Freddie Mac) + - State-specific lending requirements + - Internal credit policies and procedures + + 2. Review and synthesize: + - Document Analyzer findings + - Risk Evaluator assessments + - Compliance verification results + - Quality control checks + - Regulatory requirements + - Secondary market guidelines + + 3. Determine appropriate decision category: + - Approved + - Conditionally Approved (with specific conditions) + - Denied (with detailed adverse action notice requirements) + - Counteroffer recommendations + - Alternative program suggestions + + 4. For all decisions, provide: + - Clear written justification + - Regulatory compliance confirmation + - Required disclosures identification + - Adverse action notices if required + - Fair lending analysis documentation + - Secondary market eligibility determination + + 5. For conditional approvals, specify: + - Required documentation + - Timeline requirements + - Regulatory compliance conditions + - Prior-to-funding conditions + - Post-closing requirements + - Quality control conditions + + 6. Generate comprehensive decision report including: + - Detailed underwriting analysis + - Compliance verification results + - Supporting documentation list + - Condition status tracking + - Regulatory requirement satisfaction + - Clear audit trail documentation + + 7. Ensure all decisions adhere to: + - Fair lending requirements + - Anti-discrimination laws + - UDAAP regulations + - State and federal disclosure requirements + - Agency and investor guidelines + - Internal policies and procedures + """ + + # -------------------------------------------------------------------------- + # Utility Methods + # -------------------------------------------------------------------------- + def pdf_to_text(self, pdf_file_path: str) -> str: + """ + Converts a PDF file to a string by extracting its text content. + Args: + pdf_file_path (str): The path to the PDF file. + Returns: + str: The extracted text from the PDF. + """ + text_content = [] + with open(pdf_file_path, "rb") as f: + reader = PyPDF2.PdfReader(f) + for page in reader.pages: + page_text = page.extract_text() or "" + text_content.append(page_text) + return "\n".join(text_content) + + def autosave_result( + self, result_data: str, filename: str + ) -> None: + """ + Autosaves intermediate or final results to a text file in the designated directory. + Args: + result_data (str): The data to be written to the file. + filename (str): The desired filename (without path). + """ + full_path = os.path.join(self.save_directory, filename) + with open(full_path, "w", encoding="utf-8") as file: + file.write(result_data) + + def generate_pdf_report( + self, content: str, pdf_path: str + ) -> None: + """ + Generates a simple PDF report from text content using ReportLab. + Args: + content (str): The textual content for the PDF. + pdf_path (str): Where to save the generated PDF. + """ + BytesIO() + c = canvas.Canvas(pdf_path, pagesize=LETTER) + width, height = LETTER + + # Simple text wrap by splitting lines + lines = clean_markdown(content).split("\n") + current_height = height - 50 # top margin + + for line in lines: + # If the line is too long, wrap it manually (simple approach) + max_chars = 90 # approx number of characters per line for LETTER size + while len(line) > max_chars: + c.drawString(50, current_height, line[:max_chars]) + line = line[max_chars:] + current_height -= 15 # line spacing + c.drawString(50, current_height, line) + current_height -= 15 + + # Add a new page if we go beyond the margin + if current_height <= 50: + c.showPage() + current_height = height - 50 + + c.save() + + # -------------------------------------------------------------------------- + # Core Processing Methods + # -------------------------------------------------------------------------- + def analyze_documents(self, document_data: str) -> str: + """ + Runs the Document Analyzer Agent on the given data. + Args: + document_data (str): Text representing the mortgage documents. + Returns: + str: Structured summary and highlights from the document analysis. + """ + prompt_input = ( + self.document_prompt + + "\n\n--- BEGIN DOCUMENTS ---\n" + + document_data + + "\n--- END DOCUMENTS ---\n" + ) + print("Running Document Analyzer Agent...") + result = self.document_agent.run(prompt_input) + self.autosave_result(result, "document_analysis.txt") + return result + + def evaluate_risk(self, document_analysis: str) -> str: + """ + Runs the Risk Evaluator Agent using the results from the Document Analyzer. + Args: + document_analysis (str): The structured analysis from the Document Analyzer. + Returns: + str: Risk analysis including risk score and explanation. + """ + prompt_input = ( + self.risk_prompt + + "\n\n--- DOCUMENT ANALYSIS OUTPUT ---\n" + + document_analysis + + "\n--- END ANALYSIS OUTPUT ---\n" + ) + print("Running Risk Evaluator Agent...") + result = self.risk_agent.run(prompt_input) + self.autosave_result(result, "risk_evaluation.txt") + return result + + def underwrite_mortgage( + self, document_analysis: str, risk_evaluation: str + ) -> str: + """ + Runs the Mortgage Underwriter Agent to produce the final underwriting decision. + Args: + document_analysis (str): Output from the Document Analyzer. + risk_evaluation (str): Output from the Risk Evaluator. + Returns: + str: Final decision text with rationale. + """ + prompt_input = ( + self.underwriter_prompt + + "\n\n--- DOCUMENT ANALYSIS SUMMARY ---\n" + + document_analysis + + "\n--- RISK EVALUATION REPORT ---\n" + + risk_evaluation + + "\n--- END REPORTS ---\n" + ) + print("Running Mortgage Underwriter Agent...") + result = self.underwriter_agent.run(prompt_input) + self.autosave_result(result, "underwriting_decision.txt") + return result + + # -------------------------------------------------------------------------- + # High-Level Workflow + # -------------------------------------------------------------------------- + def run( + self, + application_data: str, + return_format: str = "pdf", + output_filename: str = "UnderwritingDecision", + ) -> Union[str, Dict]: + """ + Processes a single mortgage application from documents to final underwriting decision. + Allows returning data in either PDF or JSON format. + + Args: + application_data (str): The text representation of the applicant’s documents. + return_format (str): "pdf" or "json". Defaults to "pdf". + output_filename (str): Base filename (without extension) for the output file. + + Returns: + Union[str, Dict]: If return_format="json", returns a dict with the final data. + If return_format="pdf", returns the path of the generated PDF. + """ + # Step 1: Document Analysis + doc_analysis = self.analyze_documents(application_data) + + # Step 2: Risk Evaluation + risk_eval = self.evaluate_risk(doc_analysis) + + # Step 3: Underwriting Decision + final_decision = self.underwrite_mortgage( + doc_analysis, risk_eval + ) + + # Prepare final content (text) + final_content = ( + "---- Mortgage Underwriting Decision Report ----\n\n" + "DOCUMENT ANALYSIS:\n" + doc_analysis + "\n\n" + "RISK EVALUATION:\n" + risk_eval + "\n\n" + "FINAL UNDERWRITING DECISION:\n" + final_decision + "\n" + ) + + # Return JSON + if return_format.lower() == "json": + output_data = { + "document_analysis": doc_analysis, + "risk_evaluation": risk_eval, + "final_decision": final_decision, + } + json_path = os.path.join( + self.save_directory, f"{output_filename}.json" + ) + with open(json_path, "w", encoding="utf-8") as jf: + json.dump(output_data, jf, indent=2) + return output_data + + # Generate PDF + elif return_format.lower() == "pdf": + pdf_path = os.path.join( + self.save_directory, f"{output_filename}.pdf" + ) + self.generate_pdf_report(final_content, pdf_path) + return pdf_path + + else: + raise ValueError( + "Invalid return format. Choose either 'pdf' or 'json'." + ) + + def run_concurrently( + self, + application_data: str, + return_format: str = "pdf", + output_filename: str = "UnderwritingDecision", + ) -> Union[str, Dict]: + with concurrent.futures.ThreadPoolExecutor( + max_workers=os.cpu_count() + ) as executor: + futures = [ + executor.submit( + self.run, + application_data, + return_format, + output_filename, + ) + ] + results = [ + future.result() + for future in concurrent.futures.as_completed(futures) + ] + return results + + # -------------------------------------------------------------------------- + # Batch Processing + # -------------------------------------------------------------------------- + def runs_in_batch( + self, + list_of_application_data: List[str], + return_format: str = "pdf", + ) -> List[Union[str, Dict]]: + """ + Processes multiple mortgage applications in a batch and returns the results as + either PDFs or JSON structures for each application. + + Args: + list_of_application_data (List[str]): A list of string representations + of mortgage applications (e.g., raw text). + return_format (str): "pdf" or "json" format for the output files. + + Returns: + List[Union[str, Dict]]: A list of outputs (either file paths to PDFs or JSON dicts). + """ + results = [] + for idx, application_text in enumerate( + list_of_application_data, start=1 + ): + output_filename = f"UnderwritingDecision_{idx}" + print(f"\n--- Processing Application {idx} ---") + result = self.run( + application_data=application_text, + return_format=return_format, + output_filename=output_filename, + ) + results.append(result) + return results + + # -------------------------------------------------------------------------- + # PDF/Document Conversion Helpers + # -------------------------------------------------------------------------- + def convert_pdfs_to_texts( + self, pdf_paths: List[str] + ) -> List[str]: + """ + Converts multiple PDFs into text. + + Args: + pdf_paths (List[str]): A list of file paths to PDF documents. + + Returns: + List[str]: A list of extracted text contents, one per PDF in the list. + """ + text_results = [] + for pdf_path in pdf_paths: + print(f"Converting PDF to text: {pdf_path}") + text_data = self.pdf_to_text(pdf_path) + text_results.append(text_data) + return text_results + + +# ------------------------------------------------------------------------------ +# Example Usage (As a Script) +# ------------------------------------------------------------------------------ +if __name__ == "__main__": + # Sample mortgage application text (or read from PDF, DB, etc.) + sample_application_data = """ + Mortgage Application Data: + Applicant Name: Jane Doe + DOB: 02/14/1985 + SSN: 987-65-4321 + Annual Income: $95,000 + Credit Score: 690 + Outstanding Debt: $40,000 + Property Appraisal: $300,000 + Loan Amount Request: $270,000 + Employment: 3+ years at current employer + Bank Statements & Tax Returns: Provided for the last year + Extra Notes: Some minor late payments on credit cards in 2020. + """ + + # Initialize the swarm + swarm = MortgageUnderwritingSwarm( + save_directory="./autosave_results" + ) + + # 1) Convert PDF to text if needed + # pdf_text = swarm.pdf_to_text("path_to_some_pdf.pdf") + # Or convert multiple PDFs in batch + # texts_from_pdfs = swarm.convert_pdfs_to_texts(["file1.pdf", "file2.pdf"]) + + # 2) Process a single application + final_pdf_path = swarm.run( + application_data=sample_application_data, + return_format="pdf", # or "json" + output_filename="JaneDoe_UnderwritingDecision", + ) + print(f"PDF generated at: {final_pdf_path}") + + # 3) Process multiple applications in a batch + # multiple_apps = [sample_application_data, sample_application_data] # Pretend we have 2 + # batch_results = swarm.runs_in_batch( + # multiple_apps, + # return_format="json" + # ) + # Each item in batch_results will be a JSON dict if return_format="json". + # print("\nBatch Processing Results (JSON):") + # for result in batch_results: + # print(json.dumps(result, indent=2)) diff --git a/examples/reasoning_duo.py b/examples/reasoning_duo.py new file mode 100644 index 00000000..28c00238 --- /dev/null +++ b/examples/reasoning_duo.py @@ -0,0 +1,156 @@ +import os +from swarms import Agent +from dotenv import load_dotenv + +from swarm_models import OpenAIChat + +load_dotenv() + + +model = OpenAIChat( + model_name="deepseek-ai/DeepSeek-R1-Distill-Llama-70B-free", + openai_api_key=os.getenv("TOGETHER_API_KEY"), + base_url="https://api.together.xyz/v1", +) + +# Define system prompts for reasoning agents +THINKING_AGENT_PROMPT = """You are a sophisticated analytical and strategic thinking agent focused on deep problem analysis and solution design. + +Your core capabilities include: +1. Comprehensive Problem Analysis + - Break down complex problems into constituent elements + - Map relationships and dependencies between components + - Identify root causes and underlying patterns + - Consider historical context and precedents + +2. Multi-Perspective Evaluation + - Examine issues from multiple stakeholder viewpoints + - Consider short-term and long-term implications + - Evaluate social, economic, technical, and ethical dimensions + - Challenge assumptions and identify potential biases + +3. Risk Assessment and Mitigation + - Conduct thorough risk analysis across scenarios + - Identify potential failure modes and edge cases + - Develop contingency plans and mitigation strategies + - Assess probability and impact of various outcomes + +4. Strategic Solution Development + - Generate multiple solution approaches + - Evaluate trade-offs between different strategies + - Consider resource constraints and limitations + - Design scalable and sustainable solutions + +5. Decision Framework Creation + - Establish clear evaluation criteria + - Weight competing priorities appropriately + - Create structured decision matrices + - Document reasoning and key decision factors + +6. Systems Thinking + - Map interconnections between system elements + - Identify feedback loops and cascade effects + - Consider emergent properties and behaviors + - Account for dynamic system evolution + +Your output should always include: +- Clear articulation of your analytical process +- Key assumptions and their justification +- Potential risks and mitigation strategies +- Multiple solution options with pros/cons +- Specific recommendations with supporting rationale +- Areas of uncertainty requiring further investigation + +Focus on developing robust, well-reasoned strategies that account for complexity while remaining practical and actionable.""" + +ACTION_AGENT_PROMPT = """You are an advanced implementation and execution agent focused on turning strategic plans into concrete results. + +Your core capabilities include: +1. Strategic Implementation Planning + - Break down high-level strategies into specific actions + - Create detailed project roadmaps and timelines + - Identify critical path dependencies + - Establish clear milestones and success metrics + - Design feedback and monitoring mechanisms + +2. Resource Optimization + - Assess resource requirements and constraints + - Optimize resource allocation and scheduling + - Identify efficiency opportunities + - Plan for scalability and flexibility + - Manage competing priorities effectively + +3. Execution Management + - Develop detailed implementation procedures + - Create clear operational guidelines + - Establish quality control measures + - Design progress tracking systems + - Build in review and adjustment points + +4. Risk Management + - Implement specific risk mitigation measures + - Create early warning systems + - Develop contingency procedures + - Establish fallback positions + - Monitor risk indicators + +5. Stakeholder Management + - Identify key stakeholders and their needs + - Create communication plans + - Establish feedback mechanisms + - Manage expectations effectively + - Build support and buy-in + +6. Continuous Improvement + - Monitor implementation effectiveness + - Gather and analyze performance data + - Identify improvement opportunities + - Implement iterative enhancements + - Document lessons learned + +Your output should always include: +- Detailed action plans with specific steps +- Resource requirements and allocation plans +- Timeline with key milestones +- Success metrics and monitoring approach +- Risk mitigation procedures +- Communication and stakeholder management plans +- Quality control measures +- Feedback and adjustment mechanisms + +Focus on practical, efficient, and effective implementation while maintaining high quality standards and achieving desired outcomes.""" + +# Initialize the thinking agent +thinking_agent = Agent( + agent_name="Strategic-Thinker", + agent_description="Deep analysis and strategic planning agent", + system_prompt=THINKING_AGENT_PROMPT, + max_loops=1, + llm=model, + dynamic_temperature_enabled=True, +) + +# Initialize the action agent +action_agent = Agent( + agent_name="Action-Executor", + agent_description="Practical implementation and execution agent", + system_prompt=ACTION_AGENT_PROMPT, + max_loops=1, + model_name="gpt-4o", + dynamic_temperature_enabled=True, +) + + +def run_reasoning_duo(task: str): + # Step 1: Thinking Agent + thinking_result = thinking_agent.run(task) + + # Step 2: Action Agent + action_result = action_agent.run( + f"From {thinking_agent.agent_name}: {thinking_result}" + ) + return action_result + + +if __name__ == "__main__": + run_reasoning_duo("What is the best way to invest $1000?") diff --git a/examples/swarm_eval_deepseek.py b/examples/swarm_eval_deepseek.py new file mode 100644 index 00000000..ac4a9408 --- /dev/null +++ b/examples/swarm_eval_deepseek.py @@ -0,0 +1,170 @@ +from loguru import logger +from swarms.structs.swarm_eval import ( + SwarmEvaluator, + PRESET_DATASETS, +) + +import os +from swarms import Agent +from dotenv import load_dotenv + +from swarm_models import OpenAIChat + +load_dotenv() + + +model = OpenAIChat( + model_name="deepseek-ai/DeepSeek-R1-Distill-Llama-70B-free", + openai_api_key=os.getenv("TOGETHER_API_KEY"), + base_url="https://api.together.xyz/v1", +) + +# Define system prompts for reasoning agents +THINKING_AGENT_PROMPT = """You are a sophisticated analytical and strategic thinking agent focused on deep problem analysis and solution design. + +Your core capabilities include: +1. Comprehensive Problem Analysis + - Break down complex problems into constituent elements + - Map relationships and dependencies between components + - Identify root causes and underlying patterns + - Consider historical context and precedents + +2. Multi-Perspective Evaluation + - Examine issues from multiple stakeholder viewpoints + - Consider short-term and long-term implications + - Evaluate social, economic, technical, and ethical dimensions + - Challenge assumptions and identify potential biases + +3. Risk Assessment and Mitigation + - Conduct thorough risk analysis across scenarios + - Identify potential failure modes and edge cases + - Develop contingency plans and mitigation strategies + - Assess probability and impact of various outcomes + +4. Strategic Solution Development + - Generate multiple solution approaches + - Evaluate trade-offs between different strategies + - Consider resource constraints and limitations + - Design scalable and sustainable solutions + +5. Decision Framework Creation + - Establish clear evaluation criteria + - Weight competing priorities appropriately + - Create structured decision matrices + - Document reasoning and key decision factors + +6. Systems Thinking + - Map interconnections between system elements + - Identify feedback loops and cascade effects + - Consider emergent properties and behaviors + - Account for dynamic system evolution + +Your output should always include: +- Clear articulation of your analytical process +- Key assumptions and their justification +- Potential risks and mitigation strategies +- Multiple solution options with pros/cons +- Specific recommendations with supporting rationale +- Areas of uncertainty requiring further investigation + +Focus on developing robust, well-reasoned strategies that account for complexity while remaining practical and actionable.""" + +ACTION_AGENT_PROMPT = """You are an advanced implementation and execution agent focused on turning strategic plans into concrete results. + +Your core capabilities include: +1. Strategic Implementation Planning + - Break down high-level strategies into specific actions + - Create detailed project roadmaps and timelines + - Identify critical path dependencies + - Establish clear milestones and success metrics + - Design feedback and monitoring mechanisms + +2. Resource Optimization + - Assess resource requirements and constraints + - Optimize resource allocation and scheduling + - Identify efficiency opportunities + - Plan for scalability and flexibility + - Manage competing priorities effectively + +3. Execution Management + - Develop detailed implementation procedures + - Create clear operational guidelines + - Establish quality control measures + - Design progress tracking systems + - Build in review and adjustment points + +4. Risk Management + - Implement specific risk mitigation measures + - Create early warning systems + - Develop contingency procedures + - Establish fallback positions + - Monitor risk indicators + +5. Stakeholder Management + - Identify key stakeholders and their needs + - Create communication plans + - Establish feedback mechanisms + - Manage expectations effectively + - Build support and buy-in + +6. Continuous Improvement + - Monitor implementation effectiveness + - Gather and analyze performance data + - Identify improvement opportunities + - Implement iterative enhancements + - Document lessons learned + +Your output should always include: +- Detailed action plans with specific steps +- Resource requirements and allocation plans +- Timeline with key milestones +- Success metrics and monitoring approach +- Risk mitigation procedures +- Communication and stakeholder management plans +- Quality control measures +- Feedback and adjustment mechanisms + +Focus on practical, efficient, and effective implementation while maintaining high quality standards and achieving desired outcomes.""" + +# Initialize the thinking agent +thinking_agent = Agent( + agent_name="Strategic-Thinker", + agent_description="Deep analysis and strategic planning agent", + system_prompt=THINKING_AGENT_PROMPT, + max_loops=1, + llm=model, + dynamic_temperature_enabled=True, +) + + +class DeepSeekSwarm: + def __init__(self): + self.thinking_agent = thinking_agent + + def run(self, task: str): + first_one = self.thinking_agent.run(task) + + return self.thinking_agent.run(first_one) + + +if __name__ == "__main__": + # Initialize the swarm (replace with your actual multi-agent system) + swarm = DeepSeekSwarm() + + # Initialize the evaluator with the swarm instance + evaluator = SwarmEvaluator(swarm) + + logger.info("Starting evaluation for dataset: gsm8k") + + # For demonstration, we use 4 concurrent workers, show progress, and save results. + results = evaluator.evaluate( + "gsm8k", + split="train", + config=PRESET_DATASETS["gsm8k"], + max_workers=os.cpu_count(), + max_retries=3, + show_progress=True, + output_file="gsm8k_results.txt", + ) + + logger.info(f"Results for gsm8k: {results}") diff --git a/majority_voting_example.py b/majority_voting_example.py new file mode 100644 index 00000000..047b4878 --- /dev/null +++ b/majority_voting_example.py @@ -0,0 +1,52 @@ +from swarms import Agent, MajorityVoting + +# Initialize multiple agents with different specialties +agents = [ + Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor focused on market analysis", + system_prompt="You are a financial advisor specializing in market analysis and investment opportunities.", + max_loops=1, + model_name="gpt-4o" + ), + Agent( + agent_name="Risk-Assessment-Agent", + agent_description="Risk analysis and portfolio management expert", + system_prompt="You are a risk assessment expert focused on evaluating investment risks and portfolio diversification.", + max_loops=1, + model_name="gpt-4o" + ), + Agent( + agent_name="Tech-Investment-Agent", + agent_description="Technology sector investment specialist", + system_prompt="You are a technology investment specialist focused on AI, emerging tech, and growth opportunities.", + max_loops=1, + model_name="gpt-4o" + ) +] + + +consensus_agent = Agent( + agent_name="Consensus-Agent", + agent_description="Consensus agent focused on analyzing investment advice", + system_prompt="You are a consensus agent focused on analyzing investment advice and providing a final answer.", + max_loops=1, + model_name="gpt-4o" +) + +# Create majority voting system +majority_voting = MajorityVoting( + name="Investment-Advisory-System", + description="Multi-agent system for investment advice", + agents=agents, + verbose=True, + consensus_agent=consensus_agent +) + +# Run the analysis with majority voting +result = majority_voting.run( + task="Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", + correct_answer="" # Optional evaluation metric +) + +print(result) diff --git a/pyproject.toml b/pyproject.toml index cb82760d..67d9de10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "7.1.2" +version = "7.1.7" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] @@ -73,7 +73,6 @@ docstring_parser = "0.16" # TODO: tiktoken = "*" networkx = "*" aiofiles = "*" -clusterops = "*" # chromadb = "*" rich = "*" numpy = "*" diff --git a/requirements.txt b/requirements.txt index 10c9fa3e..10178b92 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,5 +21,4 @@ types-chardet>=5.0.4.6 mypy-protobuf>=3.0.0 pytest>=8.1.1 networkx -aiofiles -clusterops \ No newline at end of file +aiofiles \ No newline at end of file diff --git a/simple_example_ollama.py b/simple_example_ollama.py new file mode 100644 index 00000000..41ca5e04 --- /dev/null +++ b/simple_example_ollama.py @@ -0,0 +1,18 @@ +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) + + +# Initialize the agent +agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="ollama/llama2", +) + +agent.run( + "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", +) diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 5d88260c..89d10fac 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -75,7 +75,6 @@ from swarms.structs.swarming_architectures import ( staircase_swarm, star_swarm, ) -from swarms.structs.task import Task __all__ = [ @@ -98,7 +97,6 @@ __all__ = [ "rearrange", "RoundRobinSwarm", "SequentialWorkflow", - "Task", "MixtureOfAgents", "GraphWorkflow", "Node", diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 873d89c9..783206ad 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -49,9 +49,6 @@ from swarms.utils.data_to_text import data_to_text from swarms.utils.file_processing import create_file_in_folder from swarms.utils.formatter import formatter from swarms.utils.pdf_to_text import pdf_to_text -from swarms.utils.wrapper_clusterop import ( - exec_callable_with_clusterops, -) from swarms.telemetry.main import log_agent_data from swarms.agents.agent_print import agent_print from swarms.utils.litellm_tokenizer import count_tokens @@ -760,6 +757,7 @@ class Agent: is_last: Optional[bool] = False, print_task: Optional[bool] = False, generate_speech: Optional[bool] = False, + correct_answer: Optional[str] = None, *args, **kwargs, ) -> Any: @@ -858,6 +856,11 @@ class Agent: # Convert to a str if the response is not a str response = self.llm_output_parser(response) + # if correct_answer is not None: + # if correct_answer not in response: + # logger.info("Correct answer found in response") + # # break + # Print if self.streaming_on is True: # self.stream_response(response) @@ -2465,14 +2468,6 @@ class Agent: ValueError: If an invalid device is specified. Exception: If any other error occurs during execution. """ - device = device or self.device - device_id = device_id or self.device_id - all_cores = all_cores or self.all_cores - all_gpus = all_gpus or self.all_gpus - - do_not_use_cluster_ops = ( - do_not_use_cluster_ops or self.do_not_use_cluster_ops - ) if scheduled_run_date: while datetime.now() < scheduled_run_date: @@ -2482,34 +2477,16 @@ class Agent: try: # If cluster ops disabled, run directly - if do_not_use_cluster_ops is True: - logger.info("Running without cluster operations") - return self._run( - task=task, - img=img, - *args, - **kwargs, - ) - - else: - return exec_callable_with_clusterops( - device=device, - device_id=device_id, - all_cores=all_cores, - all_gpus=all_gpus, - func=self._run, - task=task, - img=img, - *args, - **kwargs, - ) + return self._run( + task=task, + img=img, + *args, + **kwargs, + ) except ValueError as e: self._handle_run_error(e) - except Exception as e: - self._handle_run_error(e) - def handle_artifacts( self, text: str, file_output_path: str, file_extension: str ) -> None: diff --git a/swarms/structs/base_workflow.py b/swarms/structs/base_workflow.py index 4107042a..5f0e799c 100644 --- a/swarms/structs/base_workflow.py +++ b/swarms/structs/base_workflow.py @@ -4,7 +4,6 @@ from typing import Any, Dict, List, Optional from swarms.utils.formatter import formatter from swarms.structs.agent import Agent from swarms.structs.base_structure import BaseStructure -from swarms.structs.task import Task from swarms.utils.loguru_logger import initialize_logger logger = initialize_logger("base-workflow") @@ -43,7 +42,7 @@ class BaseWorkflow(BaseStructure): def __init__( self, agents: List[Agent] = None, - task_pool: List[Task] = None, + task_pool: List[str] = None, models: List[Any] = None, *args, **kwargs, @@ -69,8 +68,8 @@ class BaseWorkflow(BaseStructure): def add_task( self, - task: Task = None, - tasks: List[Task] = None, + task: str = None, + tasks: List[str] = None, *args, **kwargs, ): @@ -293,12 +292,6 @@ class BaseWorkflow(BaseStructure): "green", ) - task = Task( - description=task, - agent=kwargs["agent"], - args=list(kwargs["args"]), - kwargs=kwargs["kwargs"], - ) self.tasks.append(task) except Exception as error: formatter.print_panel( @@ -306,48 +299,6 @@ class BaseWorkflow(BaseStructure): ) raise error - def load_workflow_state( - self, filepath: str = None, **kwargs - ) -> None: - """ - Loads the workflow state from a json file and restores the workflow state. - - Args: - filepath (str): The path to load the workflow state from. - - Examples: - >>> from swarm_models import OpenAIChat - >>> from swarms.structs import SequentialWorkflow - >>> llm = OpenAIChat(openai_api_key="") - >>> workflow = SequentialWorkflow(max_loops=1) - >>> workflow.add("What's the weather in miami", llm) - >>> workflow.add("Create a report on these metrics", llm) - >>> workflow.save_workflow_state("sequential_workflow_state.json") - >>> workflow.load_workflow_state("sequential_workflow_state.json") - - """ - try: - filepath = filepath or self.restore_state_filepath - - with open(filepath) as f: - state = json.load(f) - self.max_loops = state["max_loops"] - self.tasks = [] - for task_state in state["tasks"]: - task = Task( - description=task_state["description"], - agent=task_state["agent"], - args=task_state["args"], - kwargs=task_state["kwargs"], - result=task_state["result"], - history=task_state["history"], - ) - self.tasks.append(task) - except Exception as error: - formatter.print_panel( - f"Error loading workflow state: {error}", - ) - def workflow_dashboard(self, **kwargs) -> None: """ Displays a dashboard for the workflow. diff --git a/swarms/structs/csv_to_agent.py b/swarms/structs/csv_to_agent.py index 624e3577..2b8ecf9c 100644 --- a/swarms/structs/csv_to_agent.py +++ b/swarms/structs/csv_to_agent.py @@ -8,7 +8,7 @@ from dataclasses import dataclass import csv from pathlib import Path from enum import Enum -from swarms import Agent +from swarms.structs.agent import Agent class ModelName(str, Enum): diff --git a/swarms/structs/graph_swarm.py b/swarms/structs/graph_swarm.py index e67add52..1bbc1673 100644 --- a/swarms/structs/graph_swarm.py +++ b/swarms/structs/graph_swarm.py @@ -14,15 +14,6 @@ from swarms.utils.auto_download_check_packages import ( auto_check_and_download_package, ) -# Configure logging -logger.add( - "graphswarm.log", - rotation="500 MB", - retention="10 days", - level="INFO", - format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", -) - class AgentOutput(BaseModel): """Structured output from an agent.""" diff --git a/swarms/structs/groupchat.py b/swarms/structs/groupchat.py index 7428461b..05d96260 100644 --- a/swarms/structs/groupchat.py +++ b/swarms/structs/groupchat.py @@ -1,9 +1,12 @@ import concurrent.futures from datetime import datetime +import os from typing import Callable, List +from dotenv import load_dotenv from loguru import logger from pydantic import BaseModel, Field +from swarm_models import OpenAIChat from swarms.structs.agent import Agent @@ -123,7 +126,7 @@ class GroupChat: description: str = "A group chat for multiple agents", agents: List[Agent] = [], speaker_fn: SpeakerFunction = round_robin, - max_loops: int = 10, + max_loops: int = 1, ): """ Initialize the GroupChat. @@ -171,7 +174,9 @@ class GroupChat: Previous messages: {self.get_full_chat_history()} """ # Updated line - message = agent.run(context + prompt) + message = agent.run( + task=f"From {agent.agent_name}: {context} \n {prompt}" + ) return AgentResponse( agent_name=agent.name, role=agent.system_prompt, @@ -224,7 +229,7 @@ class GroupChat: def run(self, task: str) -> ChatHistory: """ - Run the group chat. + Run the group chat, feeding the context of previous turns into each new turn. Args: task (str): The initial message to start the chat. @@ -242,12 +247,22 @@ class GroupChat: turn_number=turn, responses=[], task=task ) + # Get context from previous turns + context = self.get_full_chat_history() + + # Combine task with context for agents + contextualized_task = ( + f"{task}\n\nPrevious conversation:\n{context}" + if context + else task + ) + for agent in self.agents: if self.speaker_fn( self.get_recent_messages(), agent ): response = self._get_response_sync( - agent, task, turn + agent, contextualized_task, turn ) current_turn.responses.append(response) self.chat_history.total_messages += 1 @@ -293,63 +308,63 @@ class GroupChat: ) -# if __name__ == "__main__": - -# load_dotenv() - -# # Get the OpenAI API key from the environment variable -# api_key = os.getenv("OPENAI_API_KEY") - -# # Create an instance of the OpenAIChat class -# model = OpenAIChat( -# openai_api_key=api_key, -# model_name="gpt-4o-mini", -# temperature=0.1, -# ) - -# # Example agents -# agent1 = Agent( -# agent_name="Financial-Analysis-Agent", -# system_prompt="You are a financial analyst specializing in investment strategies.", -# llm=model, -# max_loops=1, -# autosave=False, -# dashboard=False, -# verbose=True, -# dynamic_temperature_enabled=True, -# user_name="swarms_corp", -# retry_attempts=1, -# context_length=200000, -# output_type="string", -# streaming_on=False, -# ) - -# agent2 = Agent( -# agent_name="Tax-Adviser-Agent", -# system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.", -# llm=model, -# max_loops=1, -# autosave=False, -# dashboard=False, -# verbose=True, -# dynamic_temperature_enabled=True, -# user_name="swarms_corp", -# retry_attempts=1, -# context_length=200000, -# output_type="string", -# streaming_on=False, -# ) - -# agents = [agent1, agent2] - -# chat = GroupChat( -# name="Investment Advisory", -# description="Financial and tax analysis group", -# agents=agents, -# speaker_fn=expertise_based, -# ) - -# history = chat.run( -# "How to optimize tax strategy for investments?" -# ) -# print(history.model_dump_json(indent=2)) +if __name__ == "__main__": + + load_dotenv() + + # Get the OpenAI API key from the environment variable + api_key = os.getenv("OPENAI_API_KEY") + + # Create an instance of the OpenAIChat class + model = OpenAIChat( + openai_api_key=api_key, + model_name="gpt-4o-mini", + temperature=0.1, + ) + + # Example agents + agent1 = Agent( + agent_name="Financial-Analysis-Agent", + system_prompt="You are a financial analyst specializing in investment strategies.", + llm=model, + max_loops=1, + autosave=False, + dashboard=False, + verbose=True, + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + output_type="string", + streaming_on=False, + ) + + agent2 = Agent( + agent_name="Tax-Adviser-Agent", + system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.", + llm=model, + max_loops=1, + autosave=False, + dashboard=False, + verbose=True, + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + output_type="string", + streaming_on=False, + ) + + agents = [agent1, agent2] + + chat = GroupChat( + name="Investment Advisory", + description="Financial and tax analysis group", + agents=agents, + speaker_fn=expertise_based, + ) + + history = chat.run( + "How to optimize tax strategy for investments?" + ) + print(history.model_dump_json(indent=2)) diff --git a/swarms/structs/majority_voting.py b/swarms/structs/majority_voting.py index f8ee64ce..dd8deac4 100644 --- a/swarms/structs/majority_voting.py +++ b/swarms/structs/majority_voting.py @@ -1,6 +1,5 @@ import asyncio import concurrent.futures -import multiprocessing import os import re from collections import Counter @@ -147,6 +146,7 @@ class MajorityVoting: consensus_agent: Optional[Agent] = None, autosave: bool = False, verbose: bool = False, + max_loops: int = 1, *args, **kwargs, ): @@ -157,6 +157,7 @@ class MajorityVoting: self.consensus_agent = consensus_agent self.autosave = autosave self.verbose = verbose + self.max_loops = max_loops self.conversation = Conversation( time_enabled=True, *args, **kwargs @@ -175,7 +176,9 @@ class MajorityVoting: title="Majority Voting", ) - def run(self, task: str, *args, **kwargs) -> List[Any]: + def run( + self, task: str, correct_answer: str, *args, **kwargs + ) -> List[Any]: """ Runs the majority voting system and returns the majority vote. @@ -200,25 +203,44 @@ class MajorityVoting: ) self.conversation.add(agent.agent_name, response) - # Perform majority voting on the conversation - responses = [ - message["content"] - for message in self.conversation.conversation_history - if message["role"] == "agent" - ] + responses = self.conversation.return_history_as_string() + print(responses) + + prompt = f"""Conduct a detailed majority voting analysis on the following conversation: + {responses} + + Between the following agents: {[agent.agent_name for agent in self.agents]} + + Please: + 1. Identify the most common answer/recommendation across all agents + 2. Analyze any major disparities or contrasting viewpoints between agents + 3. Highlight key areas of consensus and disagreement + 4. Evaluate the strength of the majority opinion + 5. Note any unique insights from minority viewpoints + 6. Provide a final synthesized recommendation based on the majority consensus + + Focus on finding clear patterns while being mindful of important nuances in the responses. + """ # If an output parser is provided, parse the responses - if self.output_parser is not None: - majority_vote = self.output_parser( - responses, *args, **kwargs + if self.consensus_agent is not None: + majority_vote = self.consensus_agent.run( + prompt + ) + + self.conversation.add( + self.consensus_agent.agent_name, majority_vote ) - elif self.consensus_agent is not None: - majority_vote = self.consensus_agent.run(responses) else: - majority_vote = majority_voting(responses) + # fetch the last agent + majority_vote = self.agents[-1].run(prompt) + + self.conversation.add( + self.agents[-1].agent_name, majority_vote + ) # Return the majority vote - return majority_vote + return self.conversation.return_history_as_string() def batch_run( self, tasks: List[str], *args, **kwargs @@ -262,23 +284,6 @@ class MajorityVoting: for future in concurrent.futures.as_completed(futures) ] - def run_concurrently_multiprocess( - self, tasks: List[str], *args, **kwargs - ) -> List[Any]: - """ - Runs the majority voting system concurrently using multiprocessing. - - Args: - tasks (List[str]): List of tasks to be performed by the agents. - *args: Variable length argument list. - **kwargs: Arbitrary keyword arguments. - - Returns: - List[Any]: List of majority votes for each task. - """ - with multiprocessing.Pool(processes=os.cpu_count()) as pool: - return pool.map(self.run, tasks) - async def run_async( self, tasks: List[str], *args, **kwargs ) -> List[Any]: diff --git a/swarms/structs/mixture_of_agents.py b/swarms/structs/mixture_of_agents.py index 5fcd419d..f5019c04 100644 --- a/swarms/structs/mixture_of_agents.py +++ b/swarms/structs/mixture_of_agents.py @@ -1,4 +1,5 @@ import asyncio +import os import time from typing import Any, Dict, List, Optional @@ -9,6 +10,8 @@ from swarms.telemetry.main import log_agent_data from swarms.schemas.agent_step_schemas import ManySteps from swarms.prompts.ag_prompt import aggregator_system_prompt from swarms.utils.loguru_logger import initialize_logger +from swarms.utils.any_to_str import any_to_str +import concurrent.futures logger = initialize_logger(log_folder="mixture_of_agents") @@ -64,6 +67,8 @@ class MixtureOfAgents: aggregator_agent: Agent = None, aggregator_system_prompt: str = "", layers: int = 3, + max_loops: int = 1, + return_str_on: bool = False, ) -> None: """ Initialize the Mixture of Agents class with agents and configuration. @@ -82,6 +87,8 @@ class MixtureOfAgents: self.aggregator_agent: Agent = aggregator_agent self.aggregator_system_prompt: str = aggregator_system_prompt self.layers: int = layers + self.max_loops: int = max_loops + self.return_str_on: bool = return_str_on self.input_schema = MixtureOfAgentsInput( name=name, @@ -233,10 +240,61 @@ class MixtureOfAgents: Args: task (str): The task for the mixture of agents. """ - asyncio.run(self._run_async(task)) + try: + prev_context = None + + for _ in range(self.max_loops): + # Add previous context to task if available + current_task = ( + f"{task}\n\nPrevious context:\n{prev_context}" + if prev_context + else task + ) - self.output_schema.task = task + # Run async process + asyncio.run(self._run_async(current_task)) + + # Store current results as context for next loop + prev_context = ( + self.output_schema.aggregator_agent_summary + ) + + self.output_schema.task = task + + log_agent_data(self.output_schema.model_dump()) + + if self.return_str_on: + return any_to_str(self.output_schema.model_dump()) + else: + return self.output_schema.model_dump_json(indent=4) + + except Exception as e: + logger.error(f"Error running mixture of agents: {str(e)}") + raise e + + def run_batched(self, tasks: List[str]) -> List[str]: + """ + Run the mixture of agents for a batch of tasks. + + Args: + tasks (List[str]): A list of tasks for the mixture of agents. - log_agent_data(self.output_schema.model_dump()) + Returns: + List[str]: A list of responses from the mixture of agents. + """ + return [self.run(task) for task in tasks] - return self.output_schema.model_dump_json(indent=4) + def run_concurrently(self, tasks: List[str]) -> List[str]: + """ + Run the mixture of agents for a batch of tasks concurrently. + """ + with concurrent.futures.ThreadPoolExecutor( + max_workers=os.cpu_count() + ) as executor: + futures = [ + executor.submit(self.run, task) for task in tasks + ] + return [ + future.result() + for future in concurrent.futures.as_completed(futures) + ] diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index 884cd23c..28b6c421 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -3,16 +3,12 @@ import os import threading from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from multiprocessing import cpu_count from typing import Any, List import psutil from swarms.structs.agent import Agent from swarms.structs.omni_agent_types import AgentType -from swarms.utils.wrapper_clusterop import ( - exec_callable_with_clusterops, -) @dataclass @@ -113,7 +109,7 @@ def run_agents_concurrently( def run_agents_concurrently_multiprocess( - agents: List[Agent], task: str, batch_size: int = cpu_count() + agents: List[Agent], task: str, batch_size: int = os.cpu_count() ) -> List[Any]: """ Manage and run multiple agents concurrently in batches, with optimized performance. @@ -180,7 +176,7 @@ def run_agents_with_different_tasks( agent, task = pair return await run_agent_async(agent, task, executor) - cpu_cores = cpu_count() + cpu_cores = os.cpu_count() batch_size = batch_size or cpu_cores max_workers = max_workers or cpu_cores * 2 results = [] @@ -253,7 +249,7 @@ def run_agents_with_timeout( Returns: List of outputs (None for timed out agents) """ - cpu_cores = cpu_count() + cpu_cores = os.cpu_count() batch_size = batch_size or cpu_cores max_workers = max_workers or cpu_cores * 2 results = [] @@ -412,22 +408,9 @@ def run_agents_with_tasks_concurrently( List[Any]: A list of outputs from each agent execution. """ # Make the first agent not use the ifrs - - if no_clusterops: - return _run_agents_with_tasks_concurrently( - agents, tasks, batch_size, max_workers - ) - else: - return exec_callable_with_clusterops( - device, - device_id, - all_cores, - _run_agents_with_tasks_concurrently, - agents, - tasks, - batch_size, - max_workers, - ) + return _run_agents_with_tasks_concurrently( + agents, tasks, batch_size, max_workers + ) # # Example usage: diff --git a/swarms/structs/multi_process_workflow.py b/swarms/structs/multi_process_workflow.py deleted file mode 100644 index 7b04c10e..00000000 --- a/swarms/structs/multi_process_workflow.py +++ /dev/null @@ -1,244 +0,0 @@ -from multiprocessing import Manager, Pool, cpu_count -from typing import Sequence, Union, Callable, List -from concurrent.futures import ThreadPoolExecutor, as_completed - -from swarms.structs.agent import Agent -from swarms.structs.base_workflow import BaseWorkflow -from swarms.utils.loguru_logger import initialize_logger - -logger = initialize_logger(log_folder="multi_process_workflow") - - -class MultiProcessWorkflow(BaseWorkflow): - """ - Initialize a MultiProcessWorkflow object. - - Args: - max_workers (int): The maximum number of workers to use for parallel processing. - autosave (bool): Flag indicating whether to automatically save the workflow. - agents (List[Union[Agent, Callable]]): A list of Agent objects or callable functions representing the workflow tasks. - *args: Additional positional arguments. - **kwargs: Additional keyword arguments. - - Example: - >>> from swarms.structs.multi_process_workflow import MultiProcessingWorkflow - >>> from swarms.structs.task import Task - >>> from datetime import datetime - >>> from time import sleep - >>> - >>> # Define a simple task - >>> def simple_task(): - >>> sleep(1) - >>> return datetime.now() - >>> - >>> # Create a task object - >>> task = Task( - >>> name="Simple Task", - >>> execute=simple_task, - >>> priority=1, - >>> ) - >>> - >>> # Create a workflow with the task - >>> workflow = MultiProcessingWorkflow(tasks=[task]) - >>> - >>> # Run the workflow - >>> results = workflow.run(task) - >>> - >>> # Print the results - >>> print(results) - """ - - def __init__( - self, - max_workers: int = 5, - autosave: bool = True, - agents: Sequence[Union[Agent, Callable]] = None, - *args, - **kwargs, - ): - super().__init__(*args, **kwargs) - self.max_workers = max_workers - self.autosave = autosave - self.agents = agents - - self.max_workers or cpu_count() - - # Log - logger.info( - ( - "Initialized MultiProcessWorkflow with" - f" {self.max_workers} max workers and autosave set to" - f" {self.autosave}" - ), - ) - - # Log the agents - if self.agents is not None: - for agent in self.agents: - logger.info(f"Agent: {agent.agent_name}") - - def execute_task(self, task: str, *args, **kwargs): - """Execute a task and handle exceptions. - - Args: - task (Task): The task to execute. - *args: Additional positional arguments for the task execution. - **kwargs: Additional keyword arguments for the task execution. - - Returns: - Any: The result of the task execution. - - """ - try: - if self.agents is not None: - # Execute the task - for agent in self.agents: - result = agent.run(task, *args, **kwargs) - - return result - - except Exception as e: - logger.error( - ( - "An error occurred during execution of task" - f" {task}: {str(e)}" - ), - ) - return None - - def run(self, task: str, *args, **kwargs): - """Run the workflow. - - Args: - task (Task): The task to run. - *args: Additional positional arguments for the task execution. - **kwargs: Additional keyword arguments for the task execution. - - Returns: - List[Any]: The results of all executed tasks. - - """ - try: - results = [] - with Manager() as manager: - with Pool( - processes=self.max_workers, *args, **kwargs - ) as pool: - # Using manager.list() to collect results in a process safe way - results_list = manager.list() - jobs = [ - pool.apply_async( - self.execute_task, # Pass the function, not the function call - args=(task,) - + args, # Pass the arguments as a tuple - kwds=kwargs, # Pass the keyword arguments as a dictionary - callback=results_list.append, - timeout=task.timeout, - ) - for agent in self.agents - ] - - # Wait for all jobs to complete - for job in jobs: - job.get() - - results = list(results_list) - - return results - except Exception as error: - logger.error(f"Error in run: {error}") - return None - - async def async_run(self, task: str, *args, **kwargs): - """Asynchronously run the workflow. - - Args: - task (Task): The task to run. - *args: Additional positional arguments for the task execution. - **kwargs: Additional keyword arguments for the task execution. - - Returns: - List[Any]: The results of all executed tasks. - - """ - try: - results = [] - with ThreadPoolExecutor( - max_workers=self.max_workers - ) as executor: - futures = [ - executor.submit( - self.execute_task, task, *args, **kwargs - ) - for _ in range(len(self.agents)) - ] - for future in as_completed(futures): - result = future.result() - results.append(result) - - return results - except Exception as error: - logger.error(f"Error in async_run: {error}") - return None - - def batched_run( - self, tasks: List[str], batch_size: int = 5, *args, **kwargs - ): - """Run tasks in batches. - - Args: - tasks (List[str]): A list of tasks to run. - batch_size (int): The size of each batch. - *args: Additional positional arguments for the task execution. - **kwargs: Additional keyword arguments for the task execution. - - Returns: - List[Any]: The results of all executed tasks. - - """ - try: - results = [] - for i in range(0, len(tasks), batch_size): - batch = tasks[i : i + batch_size] - with Pool(processes=self.max_workers) as pool: - results_list = pool.map( - self.execute_task, batch, *args, **kwargs - ) - results.extend(results_list) - - return results - except Exception as error: - logger.error(f"Error in batched_run: {error}") - return None - - def concurrent_run(self, tasks: List[str], *args, **kwargs): - """Run tasks concurrently. - - Args: - tasks (List[str]): A list of tasks to run. - *args: Additional positional arguments for the task execution. - **kwargs: Additional keyword arguments for the task execution. - - Returns: - List[Any]: The results of all executed tasks. - - """ - try: - results = [] - with ThreadPoolExecutor( - max_workers=self.max_workers - ) as executor: - futures = [ - executor.submit( - self.execute_task, task, *args, **kwargs - ) - for task in tasks - ] - for future in as_completed(futures): - result = future.result() - results.append(result) - - return results - except Exception as error: - logger.error(f"Error in concurrent_run: {error}") - return None diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index b281c93e..b604eb59 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -13,9 +13,6 @@ from swarms.structs.agents_available import showcase_available_agents from swarms.structs.base_swarm import BaseSwarm from swarms.structs.output_types import OutputType from swarms.utils.loguru_logger import initialize_logger -from swarms.utils.wrapper_clusterop import ( - exec_callable_with_clusterops, -) from swarms.telemetry.main import log_agent_data logger = initialize_logger(log_folder="rearrange") @@ -523,29 +520,13 @@ class AgentRearrange(BaseSwarm): The result from executing the task through the cluster operations wrapper. """ try: - no_use_clusterops = ( - no_use_clusterops or self.no_use_clusterops + return self._run( + task=task, + img=img, + *args, + **kwargs, ) - if no_use_clusterops is True: - return self._run( - task=task, - img=img, - *args, - **kwargs, - ) - else: - return exec_callable_with_clusterops( - device=device, - device_id=device_id, - all_cores=all_cores, - all_gpus=all_gpus, - func=self._run, - task=task, - img=img, - *args, - **kwargs, - ) except Exception as e: self._catch_error(e) diff --git a/swarms/structs/swarm_arange.py b/swarms/structs/swarm_arange.py index efb880ad..ba1d3e3f 100644 --- a/swarms/structs/swarm_arange.py +++ b/swarms/structs/swarm_arange.py @@ -104,10 +104,6 @@ class SwarmRearrange: # Run the reliability checks self.reliability_checks() - # Logging configuration - if self.verbose: - logger.add("swarm_rearrange.log", rotation="10 MB") - def reliability_checks(self): logger.info("Running reliability checks.") if not self.swarms: diff --git a/swarms/structs/swarm_builder.py b/swarms/structs/swarm_builder.py index 71aacedd..0e56188d 100644 --- a/swarms/structs/swarm_builder.py +++ b/swarms/structs/swarm_builder.py @@ -14,8 +14,6 @@ from swarms.structs.agent import Agent from swarms.structs.swarm_router import SwarmRouter, SwarmType from swarms.utils.function_caller_model import OpenAIFunctionCaller -logger.add("swarm_builder.log", rotation="10 MB", backtrace=True) - BOSS_SYSTEM_PROMPT = """ Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective. diff --git a/swarms/structs/swarm_eval.py b/swarms/structs/swarm_eval.py new file mode 100644 index 00000000..ac47b291 --- /dev/null +++ b/swarms/structs/swarm_eval.py @@ -0,0 +1,326 @@ +import math +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any, Callable, Dict, Optional, Tuple + +from datasets import Dataset, load_dataset +from loguru import logger +from tqdm import tqdm + +# ----------------------------------------------------------------------------- +# Logging configuration: log to console and file (rotating by size) +# ----------------------------------------------------------------------------- + +# ----------------------------------------------------------------------------- +# Swarm interface example +# ----------------------------------------------------------------------------- + + +# ----------------------------------------------------------------------------- +# Benchmark configuration +# ----------------------------------------------------------------------------- +class BenchmarkConfig: + """ + Configuration for a benchmark dataset. + + Attributes: + input_column (str): The column containing the task prompt. + answer_column (str): The column containing the expected answer. + answer_extractor (Optional[Callable[[Any], str]]): Function to extract + a string answer from the dataset's raw answer format. + answer_matcher (Optional[Callable[[str, str], bool]]): Function to compare + the expected answer and the swarm output. If None, a simple substring + containment is used. + """ + + def __init__( + self, + input_column: str, + answer_column: str, + answer_extractor: Optional[Callable[[Any], str]] = None, + answer_matcher: Optional[Callable[[str, str], bool]] = None, + ): + self.input_column = input_column + self.answer_column = answer_column + self.answer_extractor = answer_extractor + self.answer_matcher = answer_matcher + + +# ----------------------------------------------------------------------------- +# Preset dataset configurations for popular benchmarks +# ----------------------------------------------------------------------------- +PRESET_DATASETS: Dict[str, BenchmarkConfig] = { + "gsm8k": BenchmarkConfig( + input_column="question", + answer_column="answer", + ), + "squad": BenchmarkConfig( + input_column="question", + answer_column="answers", + answer_extractor=lambda ans: ( + ans["text"][0] + if isinstance(ans, dict) + and "text" in ans + and isinstance(ans["text"], list) + and ans["text"] + else str(ans) + ), + ), + "winogrande": BenchmarkConfig( + input_column="sentence", + answer_column="answer", + ), + "commonsense_qa": BenchmarkConfig( + input_column="question", + answer_column="answerKey", + ), + # Add additional presets here. +} + + +# ----------------------------------------------------------------------------- +# SwarmEvaluator with extended features +# ----------------------------------------------------------------------------- +class SwarmEvaluator: + """ + Evaluator that uses a swarm of agents to process benchmark datasets + from Hugging Face, with concurrency, retries, progress display, performance timing, + and customizable answer matching. + + Example: + swarm = Swarm() + evaluator = SwarmEvaluator(swarm) + results = evaluator.evaluate("gsm8k", split="test", max_workers=4) + print(results) + """ + + def __init__(self, swarm: callable) -> None: + """ + Initialize the evaluator with a given swarm. + + Args: + swarm (Swarm): A swarm instance with a callable run(task: str) method. + """ + self.swarm = swarm + + def evaluate( + self, + dataset_name: str, + split: str = "test", + config: Optional[BenchmarkConfig] = None, + max_workers: int = 1, + max_retries: int = 3, + show_progress: bool = True, + output_file: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Evaluate the specified benchmark dataset using the swarm. + + Args: + dataset_name (str): The dataset name (from Hugging Face). + split (str): The dataset split (e.g., "test", "validation"). + config (Optional[BenchmarkConfig]): Benchmark configuration. If None, + a preset config is used. + max_workers (int): Number of concurrent workers. + max_retries (int): Number of retries for swarm tasks on failure. + show_progress (bool): If True, display a progress bar. + output_file (Optional[str]): Path to a file to write the results. + + Returns: + Dict[str, Any]: Evaluation metrics including total examples, correct answers, + accuracy, and total evaluation time. + """ + if config is None: + config = PRESET_DATASETS.get(dataset_name) + if config is None: + raise ValueError( + f"No preset config for dataset '{dataset_name}'. Provide a BenchmarkConfig." + ) + + logger.info( + f"Loading dataset '{dataset_name}' (split: {split})..." + ) + dataset: Dataset = load_dataset(dataset_name, split=split) + total_examples = len(dataset) + logger.info(f"Total examples to evaluate: {total_examples}") + + start_time = time.time() + correct = 0 + + # Function to process a single example. + def _process_example( + example: Dict[str, Any], idx: int + ) -> Tuple[bool, float]: + task_start = time.time() + task_text = example.get(config.input_column) + expected_answer = example.get(config.answer_column) + + if task_text is None or expected_answer is None: + logger.warning( + f"Example {idx}: Missing '{config.input_column}' or '{config.answer_column}', skipping." + ) + return (False, 0.0) + + # Use answer_extractor if provided. + if config.answer_extractor: + try: + expected_answer = config.answer_extractor( + expected_answer + ) + except Exception as e: + logger.error( + f"Example {idx}: Error extracting answer: {e}" + ) + return (False, 0.0) + + logger.debug(f"Example {idx} - Task: {task_text}") + logger.debug( + f"Example {idx} - Expected Answer: {expected_answer}" + ) + + try: + swarm_output = self._run_with_retry( + task_text, max_retries + ) + except Exception as e: + logger.error( + f"Example {idx}: Failed after retries. Error: {e}" + ) + return (False, time.time() - task_start) + + logger.debug( + f"Example {idx} - Swarm Output: {swarm_output}" + ) + + # Use custom matcher if provided; otherwise, default matching. + if config.answer_matcher: + is_correct = config.answer_matcher( + expected_answer, swarm_output + ) + else: + is_correct = self._default_matcher( + expected_answer, swarm_output + ) + + task_time = time.time() - task_start + logger.info( + f"Example {idx}: {'Correct' if is_correct else 'Incorrect'} in {task_time:.2f}s" + ) + return (is_correct, task_time) + + # Use ThreadPoolExecutor for concurrency. + futures = [] + total_time = 0.0 + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Optionally wrap the dataset with tqdm for a progress bar. + examples_iter = enumerate(dataset, start=1) + if show_progress: + examples_iter = tqdm( + list(examples_iter), + total=total_examples, + desc="Evaluating", + ) + + for idx, example in examples_iter: + futures.append( + executor.submit(_process_example, example, idx) + ) + + for future in as_completed(futures): + try: + is_correct, elapsed = future.result() + total_time += elapsed + if is_correct: + correct += 1 + except Exception as e: + logger.error(f"Error processing an example: {e}") + + overall_time = time.time() - start_time + accuracy = ( + correct / total_examples if total_examples > 0 else 0.0 + ) + + logger.info( + f"Evaluation complete. Total examples: {total_examples}, Correct: {correct}, " + f"Accuracy: {accuracy:.2%}, Overall Time: {overall_time:.2f}s, " + f"Average per-example time: {total_time/total_examples if total_examples else 0:.2f}s" + ) + + results = { + "total": total_examples, + "correct": correct, + "accuracy": accuracy, + "overall_time": overall_time, + "average_example_time": ( + total_time / total_examples + if total_examples + else math.nan + ), + } + + # Optionally save results to a file. + if output_file: + try: + with open(output_file, "w") as f: + for key, value in results.items(): + f.write(f"{key}: {value}\n") + logger.info(f"Results saved to {output_file}") + except Exception as e: + logger.error( + f"Error saving results to {output_file}: {e}" + ) + + return results + + def _run_with_retry(self, task: str, max_retries: int) -> str: + """ + Runs the swarm task with a retry mechanism. + + Args: + task (str): The task string. + max_retries (int): Maximum number of retries. + + Returns: + str: Swarm output. + + Raises: + Exception: If all retries fail. + """ + attempt = 0 + while attempt <= max_retries: + try: + start = time.time() + result = self.swarm.run(task) + elapsed = time.time() - start + logger.debug( + f"Task succeeded in {elapsed:.2f}s on attempt {attempt + 1}" + ) + return result + except Exception as e: + logger.warning( + f"Task failed on attempt {attempt + 1}: {e}" + ) + attempt += 1 + time.sleep(0.5 * attempt) # Exponential backoff + raise Exception("Max retries exceeded for task.") + + @staticmethod + def _default_matcher(expected: str, output: str) -> bool: + """ + Default answer matching using a normalized substring check. + + Args: + expected (str): The expected answer. + output (str): The swarm output. + + Returns: + bool: True if expected is found in output; otherwise, False. + """ + expected_norm = " ".join(expected.strip().split()) + output_norm = " ".join(output.strip().split()) + return expected_norm in output_norm + + +# ----------------------------------------------------------------------------- +# Example usage +# ----------------------------------------------------------------------------- diff --git a/swarms/structs/swarm_matcher.py b/swarms/structs/swarm_matcher.py index 32f5c834..a0bfef2c 100644 --- a/swarms/structs/swarm_matcher.py +++ b/swarms/structs/swarm_matcher.py @@ -42,8 +42,6 @@ class SwarmMatcher: Args: config (SwarmMatcherConfig): The configuration for the SwarmMatcher. """ - logger.add("swarm_matcher_debug.log", level="DEBUG") - logger.debug("Initializing SwarmMatcher") try: import torch diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index e0ca4d05..bf829357 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -1,3 +1,4 @@ +import os import uuid from datetime import datetime from typing import Any, Callable, Dict, List, Literal, Union @@ -8,16 +9,14 @@ from tenacity import retry, stop_after_attempt, wait_fixed from swarms.prompts.ag_prompt import aggregator_system_prompt from swarms.structs.agent import Agent from swarms.structs.concurrent_workflow import ConcurrentWorkflow +from swarms.structs.csv_to_agent import AgentLoader +from swarms.structs.groupchat import GroupChat from swarms.structs.mixture_of_agents import MixtureOfAgents +from swarms.structs.multi_agent_orchestrator import MultiAgentRouter from swarms.structs.rearrange import AgentRearrange from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm -from swarms.structs.groupchat import GroupChat -from swarms.structs.multi_agent_orchestrator import MultiAgentRouter from swarms.structs.swarm_matcher import swarm_matcher -from swarms.utils.wrapper_clusterop import ( - exec_callable_with_clusterops, -) from swarms.utils.loguru_logger import initialize_logger logger = initialize_logger(log_folder="swarm_router") @@ -142,6 +141,8 @@ class SwarmRouter: output_type: str = "string", # Md, PDF, Txt, csv no_cluster_ops: bool = False, speaker_fn: callable = None, + load_agents_from_csv: bool = False, + csv_file_path: str = None, *args, **kwargs, ): @@ -161,6 +162,13 @@ class SwarmRouter: self.no_cluster_ops = no_cluster_ops self.speaker_fn = speaker_fn self.logs = [] + self.load_agents_from_csv = load_agents_from_csv + self.csv_file_path = csv_file_path + + if self.load_agents_from_csv: + self.agents = AgentLoader( + csv_path=self.csv_file_path + ).load_agents() self.reliability_check() @@ -180,10 +188,6 @@ class SwarmRouter: if self.rules is not None: self.handle_rules() - def deactivate_clusterops(self): - for agent in self.agents: - agent.do_not_use_cluster_ops = True - def activate_shared_memory(self): logger.info("Activating shared memory with all agents ") @@ -273,9 +277,6 @@ class SwarmRouter: self._create_swarm(self.swarm_type) - if self.no_cluster_ops: - self.deactivate_clusterops() - elif self.swarm_type == "AgentRearrange": return AgentRearrange( name=self.name, @@ -428,10 +429,6 @@ class SwarmRouter: self, task: str, img: str = None, - device: str = "cpu", - all_cores: bool = True, - all_gpus: bool = False, - no_clusterops: bool = True, *args, **kwargs, ) -> Any: @@ -453,18 +450,7 @@ class SwarmRouter: Exception: If an error occurs during task execution. """ try: - if no_clusterops: - return self._run(task=task, img=img, *args, **kwargs) - else: - return exec_callable_with_clusterops( - func=self._run, - device=device, - all_cores=all_cores, - all_gpus=all_gpus, - task=task, - *args, - **kwargs, - ) + return self._run(task=task, img=img, *args, **kwargs) except Exception as e: logger.error(f"Error executing task on swarm: {str(e)}") raise @@ -515,41 +501,6 @@ class SwarmRouter: raise return results - def threaded_run(self, task: str, *args, **kwargs) -> Any: - """ - Execute a task on the selected or matched swarm type using threading. - - Args: - task (str): The task to be executed by the swarm. - *args: Variable length argument list. - **kwargs: Arbitrary keyword arguments. - - Returns: - Any: The result of the swarm's execution. - - Raises: - Exception: If an error occurs during task execution. - """ - from threading import Thread - - def run_in_thread(): - try: - result = self.run(task, *args, **kwargs) - return result - except Exception as e: - self._log( - "error", - f"Error occurred while running task in thread on {self.swarm_type} swarm: {str(e)}", - task=task, - metadata={"error": str(e)}, - ) - raise - - thread = Thread(target=run_in_thread) - thread.start() - thread.join() - return thread.result - def async_run(self, task: str, *args, **kwargs) -> Any: """ Execute a task on the selected or matched swarm type asynchronously. @@ -610,7 +561,9 @@ class SwarmRouter: """ from concurrent.futures import ThreadPoolExecutor - with ThreadPoolExecutor() as executor: + with ThreadPoolExecutor( + max_workers=os.cpu_count() + ) as executor: future = executor.submit(self.run, task, *args, **kwargs) result = future.result() return result diff --git a/swarms/telemetry/main.py b/swarms/telemetry/main.py index 53bfa884..1b2396f2 100644 --- a/swarms/telemetry/main.py +++ b/swarms/telemetry/main.py @@ -299,6 +299,7 @@ def log_agent_data(data_dict: dict) -> dict | None: requests.exceptions.RequestException, requests.exceptions.JSONDecodeError, ): - pass # Fail silently without any action + return None # Return None if anything goes wrong - return None # Return None if anything goes wrong + +# print(log_agent_data(get_user_device_data())) diff --git a/swarms/utils/loguru_logger.py b/swarms/utils/loguru_logger.py index af5c7239..9295e9eb 100644 --- a/swarms/utils/loguru_logger.py +++ b/swarms/utils/loguru_logger.py @@ -1,10 +1,10 @@ import os import uuid +import sys from loguru import logger def initialize_logger(log_folder: str = "logs"): - AGENT_WORKSPACE = "agent_workspace" # Check if WORKSPACE_DIR is set, if not, set it to AGENT_WORKSPACE @@ -24,14 +24,27 @@ def initialize_logger(log_folder: str = "logs"): log_folder_path, f"{log_folder}_{uuid_for_log}.log" ) + # Remove default handler and add custom handlers + logger.remove() + + # Add console handler with colors + logger.add( + sys.stdout, + colorize=True, + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", + level="INFO", + ) + + # Add file handler logger.add( log_file_path, + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", level="INFO", - colorize=True, backtrace=True, diagnose=True, enqueue=True, retention="10 days", # compression="zip", ) + return logger diff --git a/swarms/utils/wrapper_clusterop.py b/swarms/utils/wrapper_clusterop.py index 7dc6d855..6119c532 100644 --- a/swarms/utils/wrapper_clusterop.py +++ b/swarms/utils/wrapper_clusterop.py @@ -113,3 +113,15 @@ def exec_callable_with_clusterops( if enable_logging: logger.error(f"An error occurred during execution: {e}") raise + + +# def test_clusterops(x): +# return x + 1 + +# example = exec_callable_with_clusterops( +# device="cpu", +# all_cores=True, +# func = test_clusterops, +# ) + +# print(example) diff --git a/tests/agent_evals/auto_test_eval.py b/tests/agent_evals/auto_test_eval.py index b9c770fa..06a1c839 100644 --- a/tests/agent_evals/auto_test_eval.py +++ b/tests/agent_evals/auto_test_eval.py @@ -81,12 +81,6 @@ class SwarmsIssueReporter: # Initialize logging log_path = os.path.join(os.getcwd(), "logs", log_file) os.makedirs(os.path.dirname(log_path), exist_ok=True) - logger.add( - log_path, - rotation="1 day", - retention="1 month", - compression="zip", - ) # Issue tracking self.issues_created = [] diff --git a/tests/agent_evals/github_summarizer_agent.py b/tests/agent_evals/github_summarizer_agent.py index c461c307..17da45dc 100644 --- a/tests/agent_evals/github_summarizer_agent.py +++ b/tests/agent_evals/github_summarizer_agent.py @@ -9,15 +9,6 @@ from swarm_models import OpenAIChat GITHUB_REPO = "kyegomez/swarms" # Swarms GitHub repository GITHUB_API_URL = f"https://api.github.com/repos/{GITHUB_REPO}/commits" -# Initialize Loguru -logger.add( - "commit_summary.log", - rotation="1 MB", - level="INFO", - backtrace=True, - diagnose=True, -) - # Step 1: Fetch the latest commits from GitHub def fetch_latest_commits( diff --git a/tests/structs/test_multiprocess.py b/tests/structs/test_multiprocess.py deleted file mode 100644 index 92d5dc83..00000000 --- a/tests/structs/test_multiprocess.py +++ /dev/null @@ -1,177 +0,0 @@ -import asyncio -import time -from swarms.structs.agent import Agent -from swarms.structs.multi_process_workflow import MultiProcessWorkflow - - -def create_test_agent(name: str) -> Agent: - """Create a test agent that simply returns its input with a timestamp""" - return Agent( - agent_name=name, - system_prompt=f"Test prompt for {name}", - model_name="gpt-4o-mini", - max_loops=1, - ) - - -def test_initialization(): - """Test basic workflow initialization""" - print("\n=== Testing Workflow Initialization ===") - try: - agents = [create_test_agent(f"agent{i}") for i in range(3)] - workflow = MultiProcessWorkflow(max_workers=2, agents=agents) - - print("✓ Created workflow with configuration:") - print(f" - Max workers: {workflow.max_workers}") - print(f" - Number of agents: {len(workflow.agents)}") - print(f" - Autosave: {workflow.autosave}") - print("✓ Initialization test passed") - except Exception as e: - print(f"✗ Initialization test failed: {str(e)}") - raise - - -def test_execute_task(): - """Test execution of a single task""" - print("\n=== Testing Task Execution ===") - try: - agents = [create_test_agent("test_agent")] - workflow = MultiProcessWorkflow(agents=agents) - - test_task = "Return this message with timestamp" - result = workflow.execute_task(test_task) - - print("✓ Task executed successfully") - print(f" - Input task: {test_task}") - print(f" - Result: {result}") - print("✓ Task execution test passed") - except Exception as e: - print(f"✗ Task execution test failed: {str(e)}") - raise - - -def test_parallel_run(): - """Test parallel execution of tasks""" - print("\n=== Testing Parallel Run ===") - try: - agents = [create_test_agent(f"agent{i}") for i in range(3)] - workflow = MultiProcessWorkflow(max_workers=2, agents=agents) - - test_task = "Process this in parallel" - results = workflow.run(test_task) - - print("✓ Parallel execution completed") - # print(f" - Number of results: {len(results)}") - print(f" - Results: {results}") - print("✓ Parallel run test passed") - except Exception as e: - print(f"✗ Parallel run test failed: {str(e)}") - raise - - -async def test_async_run(): - """Test asynchronous execution of tasks""" - print("\n=== Testing Async Run ===") - try: - agents = [create_test_agent(f"agent{i}") for i in range(3)] - workflow = MultiProcessWorkflow(max_workers=2, agents=agents) - - test_task = "Process this asynchronously" - results = await workflow.async_run(test_task) - - print("✓ Async execution completed") - print(f" - Number of results: {len(results)}") - print(f" - Results: {results}") - print("✓ Async run test passed") - except Exception as e: - print(f"✗ Async run test failed: {str(e)}") - raise - - -def test_batched_run(): - """Test batch execution of tasks""" - print("\n=== Testing Batched Run ===") - try: - agents = [create_test_agent(f"agent{i}") for i in range(2)] - workflow = MultiProcessWorkflow(max_workers=2, agents=agents) - - tasks = [f"Batch task {i}" for i in range(5)] - results = workflow.batched_run(tasks, batch_size=2) - - print("✓ Batch execution completed") - print(f" - Number of tasks: {len(tasks)}") - print(" - Batch size: 2") - print(f" - Results: {results}") - print("✓ Batched run test passed") - except Exception as e: - print(f"✗ Batched run test failed: {str(e)}") - raise - - -def test_concurrent_run(): - """Test concurrent execution of tasks""" - print("\n=== Testing Concurrent Run ===") - try: - agents = [create_test_agent(f"agent{i}") for i in range(2)] - workflow = MultiProcessWorkflow(max_workers=2, agents=agents) - - tasks = [f"Concurrent task {i}" for i in range(4)] - results = workflow.concurrent_run(tasks) - - print("✓ Concurrent execution completed") - print(f" - Number of tasks: {len(tasks)}") - print(f" - Results: {results}") - print("✓ Concurrent run test passed") - except Exception as e: - print(f"✗ Concurrent run test failed: {str(e)}") - raise - - -def test_error_handling(): - """Test error handling in workflow""" - print("\n=== Testing Error Handling ===") - try: - # Create workflow with no agents to trigger error - workflow = MultiProcessWorkflow(max_workers=2, agents=None) - result = workflow.execute_task( - "This should handle the error gracefully" - ) - - print("✓ Error handled gracefully") - print(f" - Result when no agents: {result}") - print("✓ Error handling test passed") - except Exception as e: - print(f"✗ Error handling test failed: {str(e)}") - raise - - -async def run_all_tests(): - """Run all tests""" - print("\n=== Starting MultiProcessWorkflow Test Suite ===") - start_time = time.time() - - try: - # Run synchronous tests - test_initialization() - test_execute_task() - test_parallel_run() - test_batched_run() - test_concurrent_run() - test_error_handling() - - # Run async test - await test_async_run() - - end_time = time.time() - duration = round(end_time - start_time, 2) - print("\n=== Test Suite Completed Successfully ===") - print(f"Time taken: {duration} seconds") - - except Exception as e: - print("\n=== Test Suite Failed ===") - print(f"Error: {str(e)}") - raise - - -if __name__ == "__main__": - asyncio.run(run_all_tests())