diff --git a/.gitignore b/.gitignore index e775b0cc..b2cb8f9e 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ static/generated runs artifacts_five chroma +Accounting Assistant_state.json Unit Testing Agent_state.json Devin_state.json json_logs diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 0e1e6bad..29b98dc4 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -141,12 +141,12 @@ nav: - BaseStructure: "swarms/structs/basestructure.md" - Task: "swarms/structs/task.md" - YamlModel: "swarms/structs/yaml_model.md" - - Workflows: - - BaseWorkflow: "swarms/structs/base_workflow.md" - - ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md" - - SequentialWorkflow: "swarms/structs/sequential_workflow.md" - - MultiProcessingWorkflow: "swarms/structs/multi_processing_workflow.md" - Multi Agent Architectures: + - Workflows: + - BaseWorkflow: "swarms/structs/base_workflow.md" + - ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md" + - SequentialWorkflow: "swarms/structs/sequential_workflow.md" + - MultiProcessingWorkflow: "swarms/structs/multi_processing_workflow.md" - Conversation: "swarms/structs/conversation.md" - SwarmNetwork: "swarms/structs/swarmnetwork.md" - MajorityVoting: "swarms/structs/majorityvoting.md" @@ -158,6 +158,7 @@ nav: - AutoSwarmRouter: "swarms/structs/auto_swarm_router.md" - AutoSwarm: "swarms/structs/auto_swarm.md" - GroupChat: "swarms/structs/group_chat.md" + - AgentRegistry: "swarms/structs/agent_registry.md" - Swarms Cloud API: - Overview: "swarms_cloud/main.md" - Available Models: "swarms_cloud/available_models.md" diff --git a/docs/swarms/structs/agent_registry.md b/docs/swarms/structs/agent_registry.md new file mode 100644 index 00000000..be267b4f --- /dev/null +++ b/docs/swarms/structs/agent_registry.md @@ -0,0 +1,155 @@ +# AgentRegistry Documentation + +The `AgentRegistry` class is designed to manage a collection of agents, providing methods for adding, deleting, updating, and querying agents. This class ensures thread-safe operations on the registry, making it suitable for concurrent environments. Additionally, the `AgentModel` class is a Pydantic model used for validating and storing agent information. + +## Attributes + +### AgentModel + +| Attribute | Type | Description | +|-----------|--------|--------------------------------------| +| `agent_id`| `str` | The unique identifier for the agent. | +| `agent` | `Agent`| The agent object. | + +### AgentRegistry + +| Attribute | Type | Description | +|-----------|---------------------|-------------------------------------------| +| `agents` | `Dict[str, AgentModel]` | A dictionary mapping agent IDs to `AgentModel` instances. | +| `lock` | `Lock` | A threading lock for thread-safe operations. | + +## Methods + +### `__init__(self)` + +Initializes the `AgentRegistry` object. + +- **Usage Example:** + ```python + registry = AgentRegistry() + ``` + +### `add(self, agent_id: str, agent: Agent) -> None` + +Adds a new agent to the registry. + +- **Parameters:** + - `agent_id` (`str`): The unique identifier for the agent. + - `agent` (`Agent`): The agent to add. + +- **Raises:** + - `ValueError`: If the agent ID already exists in the registry. + - `ValidationError`: If the input data is invalid. + +- **Usage Example:** + ```python + agent = Agent(agent_name="Agent1") + registry.add("agent_1", agent) + ``` + +### `delete(self, agent_id: str) -> None` + +Deletes an agent from the registry. + +- **Parameters:** + - `agent_id` (`str`): The unique identifier for the agent to delete. + +- **Raises:** + - `KeyError`: If the agent ID does not exist in the registry. + +- **Usage Example:** + ```python + registry.delete("agent_1") + ``` + +### `update_agent(self, agent_id: str, new_agent: Agent) -> None` + +Updates an existing agent in the registry. + +- **Parameters:** + - `agent_id` (`str`): The unique identifier for the agent to update. + - `new_agent` (`Agent`): The new agent to replace the existing one. + +- **Raises:** + - `KeyError`: If the agent ID does not exist in the registry. + - `ValidationError`: If the input data is invalid. + +- **Usage Example:** + ```python + new_agent = Agent(agent_name="UpdatedAgent") + registry.update_agent("agent_1", new_agent) + ``` + +### `get(self, agent_id: str) -> Agent` + +Retrieves an agent from the registry. + +- **Parameters:** + - `agent_id` (`str`): The unique identifier for the agent to retrieve. + +- **Returns:** + - `Agent`: The agent associated with the given agent ID. + +- **Raises:** + - `KeyError`: If the agent ID does not exist in the registry. + +- **Usage Example:** + ```python + agent = registry.get("agent_1") + ``` + +### `list_agents(self) -> List[str]` + +Lists all agent identifiers in the registry. + +- **Returns:** + - `List[str]`: A list of all agent identifiers. + +- **Usage Example:** + ```python + agent_ids = registry.list_agents() + ``` + +### `query(self, condition: Optional[Callable[[Agent], bool]] = None) -> List[Agent]` + +Queries agents based on a condition. + +- **Parameters:** + - `condition` (`Optional[Callable[[Agent], bool]]`): A function that takes an agent and returns a boolean indicating whether the agent meets the condition. Defaults to `None`. + +- **Returns:** + - `List[Agent]`: A list of agents that meet the condition. + +- **Usage Example:** + ```python + def is_active(agent): + return agent.is_active + + active_agents = registry.query(is_active) + ``` + +### `find_agent_by_name(self, agent_name: str) -> Agent` + +Finds an agent by its name. + +- **Parameters:** + - `agent_name` (`str`): The name of the agent to find. + +- **Returns:** + - `Agent`: The agent with the specified name. + +- **Usage Example:** + ```python + agent = registry.find_agent_by_name("Agent1") + ``` + +## Logging and Error Handling + +Each method in the `AgentRegistry` class includes logging to track the execution flow and captures errors to provide detailed information in case of failures. This is crucial for debugging and ensuring smooth operation of the registry. The `report_error` function is used for reporting exceptions that occur during method execution. + +## Additional Tips + +- Ensure that agents provided to the `AgentRegistry` are properly initialized and configured to handle the tasks they will receive. +- Utilize the logging information to monitor and debug the registry operations. +- Use the `lock` attribute to ensure thread-safe operations when accessing or modifying the registry. + diff --git a/docs/swarms/structs/sequential_workflow.md b/docs/swarms/structs/sequential_workflow.md index 74ee1acb..05b047b6 100644 --- a/docs/swarms/structs/sequential_workflow.md +++ b/docs/swarms/structs/sequential_workflow.md @@ -1,606 +1,89 @@ -# `SequentialWorkflow` Documentation +# SequentialWorkflow Documentation -The **SequentialWorkflow** class is a Python module designed to facilitate the execution of a sequence of tasks in a sequential manner. It is a part of the `swarms.structs` package and is particularly useful for orchestrating the execution of various callable objects, such as functions or models, in a predefined order. This documentation will provide an in-depth understanding of the **SequentialWorkflow** class, including its purpose, architecture, usage, and examples. +The `SequentialWorkflow` class is designed to manage and execute a sequence of tasks through a dynamic arrangement of agents. This class allows for the orchestration of multiple agents in a predefined order, facilitating complex workflows where tasks are processed sequentially by different agents. -## Purpose and Relevance +## Attributes -The **SequentialWorkflow** class is essential for managing and executing a series of tasks or processes, where each task may depend on the outcome of the previous one. It is commonly used in various application scenarios, including but not limited to: +| Attribute | Type | Description | +|------------------|---------------|--------------------------------------------------| +| `agents` | `List[Agent]` | The list of agents in the workflow. | +| `flow` | `str` | A string representing the order of agents. | +| `agent_rearrange`| `AgentRearrange` | Manages the dynamic execution of agents. | -1. **Natural Language Processing (NLP) Workflows:** In NLP workflows, multiple language models are employed sequentially to process and generate text. Each model may depend on the results of the previous one, making sequential execution crucial. +## Methods -2. **Data Analysis Pipelines:** Data analysis often involves a series of tasks such as data preprocessing, transformation, and modeling steps. These tasks must be performed sequentially to ensure data consistency and accuracy. +### `__init__(self, agents: List[Agent] = None, max_loops: int = 1, *args, **kwargs)` -3. **Task Automation:** In task automation scenarios, there is a need to execute a series of automated tasks in a specific order. Sequential execution ensures that each task is performed in a predefined sequence, maintaining the workflow's integrity. +The constructor initializes the `SequentialWorkflow` object. -By providing a structured approach to managing these tasks, the **SequentialWorkflow** class helps developers streamline their workflow execution and improve code maintainability. +- **Parameters:** + - `agents` (`List[Agent]`, optional): The list of agents in the workflow. Defaults to `None`. + - `max_loops` (`int`, optional): The maximum number of loops to execute the workflow. Defaults to `1`. + - `*args`: Variable length argument list. + - `**kwargs`: Arbitrary keyword arguments. -## Key Concepts and Terminology +### `run(self, task: str) -> str` -Before delving into the details of the **SequentialWorkflow** class, let's define some key concepts and terminology that will be used throughout the documentation: +Runs the specified task through the agents in the dynamically constructed flow. -### Task +- **Parameters:** + - `task` (`str`): The task for the agents to execute. -A **task** refers to a specific unit of work that needs to be executed as part of the workflow. Each task is associated with a description and can be implemented as a callable object, such as a function or a model. +- **Returns:** + - `str`: The final result after processing through all agents. -### Agent +- **Usage Example:** + ```python + from swarms import Agent, SequentialWorkflow, Anthropic -A **agent** represents a callable object that can be a task within the **SequentialWorkflow**. Agents encapsulate the logic and functionality of a particular task. Agents can be functions, models, or any callable object that can be executed. -### Sequential Execution + # Initialize the language model agent (e.g., GPT-3) + llm = Anthropic() -Sequential execution refers to the process of running tasks one after the other in a predefined order. In a **SequentialWorkflow**, tasks are executed sequentially, meaning that each task starts only after the previous one has completed. + # Place your key in .env -### Workflow + # Initialize agents for individual tasks + agent1 = Agent( + agent_name="Blog generator", + system_prompt="Generate a blog post like stephen king", + llm=llm, + max_loops=1, + dashboard=False, + tools=[], + ) + agent2 = Agent( + agent_name="summarizer", + system_prompt="Sumamrize the blog post", + llm=llm, + max_loops=1, + dashboard=False, + tools=[], + ) -A **workflow** is a predefined sequence of tasks that need to be executed in a specific order. It represents the overall process or pipeline that the **SequentialWorkflow** manages. + # Create the Sequential workflow + workflow = SequentialWorkflow( + agents=[agent1, agent2], max_loops=1, verbose=False + ) -### Dashboard (Optional) + # Run the workflow + workflow.run( + "Generate a blog post on how swarms of agents can help businesses grow." + ) -A **dashboard** is an optional feature of the **SequentialWorkflow** that provides real-time monitoring and visualization of the workflow's progress. It displays information such as the current task being executed, task results, and other relevant metadata. + ``` -### Max Loops + This example initializes a `SequentialWorkflow` with three agents and executes a task, printing the final result. -The **maximum number of times** the entire workflow can be run. This parameter allows developers to control how many times the workflow is executed. +- **Notes:** + - Logs the task execution process and handles any exceptions that occur during the task execution. -### Autosaving +### Logging and Error Handling -**Autosaving** is a feature that allows the **SequentialWorkflow** to automatically save its state to a file at specified intervals. This feature helps in resuming a workflow from where it left off, even after interruptions. +The `run` method includes logging to track the execution flow and captures errors to provide detailed information in case of failures. This is crucial for debugging and ensuring smooth operation of the workflow. -Now that we have a clear understanding of the key concepts and terminology, let's explore the architecture and usage of the **SequentialWorkflow** class in more detail. +## Additional Tips -## Architecture of SequentialWorkflow +- Ensure that the agents provided to the `SequentialWorkflow` are properly initialized and configured to handle the tasks they will receive. -The architecture of the **SequentialWorkflow** class is designed to provide a structured and flexible way to define, manage, and execute a sequence of tasks. It comprises the following core components: - -1. **Task**: The **Task** class represents an individual unit of work within the workflow. Each task has a description, which serves as a human-readable identifier for the task. Tasks can be implemented as callable objects, allowing for great flexibility in defining their functionality. - -2. **Workflow**: The **SequentialWorkflow** class itself represents the workflow. It manages a list of tasks in the order they should be executed. Workflows can be run sequentially or asynchronously, depending on the use case. - -3. **Task Execution**: Task execution is the process of running each task in the workflow. Tasks are executed one after another in the order they were added to the workflow. Task results can be passed as inputs to subsequent tasks. - -4. **Dashboard (Optional)**: The **SequentialWorkflow** optionally includes a dashboard feature. The dashboard provides a visual interface for monitoring the progress of the workflow. It displays information about the current task, task results, and other relevant metadata. - -5. **State Management**: The **SequentialWorkflow** supports state management, allowing developers to save and load the state of the workflow to and from JSON files. This feature is valuable for resuming workflows after interruptions or for sharing workflow configurations. - -## Usage of SequentialWorkflow - -The **SequentialWorkflow** class is versatile and can be employed in a wide range of applications. Its usage typically involves the following steps: - -1. **Initialization**: Begin by initializing any callable objects or flows that will serve as tasks in the workflow. These callable objects can include functions, models, or any other Python objects that can be executed. - -2. **Workflow Creation**: Create an instance of the **SequentialWorkflow** class. Specify the maximum number of loops the workflow should run and whether a dashboard should be displayed. - -3. **Task Addition**: Add tasks to the workflow using the `add` method. Each task should be described using a human-readable description, and the associated agent (callable object) should be provided. Additional arguments and keyword arguments can be passed to the task. - -4. **Task Execution**: Execute the workflow using the `run` method. The tasks within the workflow will be executed sequentially, with task results passed as inputs to subsequent tasks. - -5. **Accessing Results**: After running the workflow, you can access the results of each task using the `get_task_results` method or by directly accessing the `result` attribute of each task. - -6. **Optional Features**: Optionally, you can enable features such as autosaving of the workflow state and utilize the dashboard for real-time monitoring. - - -## Installation - -Before using the Sequential Workflow library, you need to install it. You can install it via pip: - -```bash -pip3 install --upgrade swarms -``` - -## Quick Start - -Let's begin with a quick example to demonstrate how to create and run a Sequential Workflow. In this example, we'll create a workflow that generates a 10,000-word blog on "health and wellness" using an AI model and then summarizes the generated content. - -```python -from swarms.models import OpenAIChat -from swarms.structs import Agent -from swarms.structs.sequential_workflow import SequentialWorkflow - -# Initialize the language model agent (e.g., GPT-3) -llm = OpenAIChat( - openai_api_key="YOUR_API_KEY", - temperature=0.5, - max_tokens=3000, -) - -# Initialize flows for individual tasks -flow1 = Agent(llm=llm, max_loops=1, dashboard=False) -flow2 = Agent(llm=llm, max_loops=1, dashboard=False) - -# Create the Sequential Workflow -workflow = SequentialWorkflow(max_loops=1) - -# Add tasks to the workflow -workflow.add("Generate a 10,000 word blog on health and wellness.", flow1) -workflow.add("Summarize the generated blog", flow2) - -# Run the workflow -workflow.run() - -# Output the results -for task in workflow.tasks: - print(f"Task: {task.description}, Result: {task.result}") -``` - -This quick example demonstrates the basic usage of the Sequential Workflow. It creates two tasks and executes them sequentially. - -## Class: `Task` - -### Description - -The `Task` class represents an individual task in the workflow. A task is essentially a callable object, such as a function or a class, that can be executed sequentially. Tasks can have arguments and keyword arguments. - -### Class Definition - -```python -class Task: - def __init__(self, description: str, agent: Union[Callable, Agent], args: List[Any] = [], kwargs: Dict[str, Any] = {}, result: Any = None, history: List[Any] = []) -``` - -### Parameters - -- `description` (str): A description of the task. -- `agent` (Union[Callable, Agent]): The callable object representing the task. It can be a function, class, or a `Agent` instance. -- `args` (List[Any]): A list of positional arguments to pass to the task when executed. Default is an empty list. -- `kwargs` (Dict[str, Any]): A dictionary of keyword arguments to pass to the task when executed. Default is an empty dictionary. -- `result` (Any): The result of the task's execution. Default is `None`. -- `history` (List[Any]): A list to store the historical results of the task. Default is an empty list. - -### Methods - -#### `execute()` - -Execute the task. - -```python -def execute(self): -``` - -This method executes the task and updates the `result` and `history` attributes of the task. It checks if the task is a `Agent` instance and if the 'task' argument is needed. - -## Class: `SequentialWorkflow` - -### Description - -The `SequentialWorkflow` class is responsible for managing a sequence of tasks and executing them in a sequential order. It provides methods for adding tasks, running the workflow, and managing the state of the tasks. - -### Class Definition - -```python -class SequentialWorkflow: - def __init__(self, max_loops: int = 1, autosave: bool = False, saved_state_filepath: Optional[str] = "sequential_workflow_state.json", restore_state_filepath: Optional[str] = None, dashboard: bool = False, tasks: List[Task] = []) -``` - -### Parameters - -- `max_loops` (int): The maximum number of times to run the workflow sequentially. Default is `1`. -- `autosave` (bool): Whether to enable autosaving of the workflow state. Default is `False`. -- `saved_state_filepath` (Optional[str]): The file path to save the workflow state when autosave is enabled. Default is `"sequential_workflow_state.json"`. -- `restore_state_filepath` (Optional[str]): The file path to restore the workflow state when initializing. Default is `None`. -- `dashboard` (bool): Whether to display a dashboard with workflow information. Default is `False`. -- `tasks` (List[Task]): A list of `Task` instances representing the tasks in the workflow. Default is an empty list. - -### Methods - -#### `add(task: str, agent: Union[Callable, Agent], *args, **kwargs)` - -Add a task to the workflow. - -```python -def add(self, task: str, agent: Union[Callable, Agent], *args, **kwargs) -> None: -``` - -This method adds a new task to the workflow. You can provide a description of the task, the callable object (function, class, or `Agent` instance), and any additional positional or keyword arguments required for the task. - -#### `reset_workflow()` - -Reset the workflow by clearing the results of each task. - -```python -def reset_workflow(self) -> None: -``` - -This method clears the results of each task in the workflow, allowing you to start fresh without reinitializing the workflow. - -#### `get_task_results()` - -Get the results of each task in the workflow. - -```python -def get_task_results(self) -> Dict[str, Any]: -``` - -This method returns a dictionary containing the results of each task in the workflow, where the keys are task descriptions, and the values are the corresponding results. - -#### `remove_task(task_description: str)` - -Remove a task from the workflow. - -```python -def remove_task(self, task_description: str) -> None: -``` - -This method removes a specific task from the workflow based on its description. - -#### `update_task(task_description: str, **updates)` - -Update the arguments of a task in the workflow. - -```python -def update_task(self, task_description: str, **updates) -> None: -``` - -This method allows you to update the arguments and keyword arguments of a task in the workflow. You specify the task's description and provide the updates as keyword arguments. - -#### `save_workflow_state(filepath: Optional[str] = "sequential_workflow_state.json", **kwargs)` - -Save the workflow state to a JSON file. - -```python -def save_workflow_state(self, filepath: Optional[str] = "sequential_workflow_state.json", **kwargs) -> None: -``` - -This method saves the current state of the workflow, including the results and history of each task, to a JSON file. You can specify the file path for saving the state. - -#### `load_workflow_state(filepath: str = None, **kwargs)` - -Load the workflow state from a JSON file and restore the workflow state. - -```python -def load_workflow_state(self, filepath: str = None, **kwargs) -> None: -``` - -This method loads a previously saved workflow state from a JSON file - - and restores the state, allowing you to continue the workflow from where it was saved. You can specify the file path for loading the state. - -#### `run()` - -Run the workflow sequentially. - -```python -def run(self) -> None: -``` - -This method executes the tasks in the workflow sequentially. It checks if a task is a `Agent` instance and handles the agent of data between tasks accordingly. - -#### `arun()` - -Asynchronously run the workflow. - -```python -async def arun(self) -> None: -``` - -This method asynchronously executes the tasks in the workflow sequentially. It's suitable for use cases where asynchronous execution is required. It also handles data agent between tasks. - -#### `workflow_bootup(**kwargs)` - -Display a bootup message for the workflow. - -```python -def workflow_bootup(self, **kwargs) -> None: -``` - -This method displays a bootup message when the workflow is initialized. You can customize the message by providing additional keyword arguments. - -#### `workflow_dashboard(**kwargs)` - -Display a dashboard for the workflow. - -```python -def workflow_dashboard(self, **kwargs) -> None: -``` - -This method displays a dashboard with information about the workflow, such as the number of tasks, maximum loops, and autosave settings. You can customize the dashboard by providing additional keyword arguments. - -## Examples - -Let's explore some examples to illustrate how to use the Sequential Workflow library effectively. - -Sure, I'll recreate the usage examples section for each method and use case using the provided foundation. Here are the examples: - -### Example 1: Adding Tasks to a Sequential Workflow - -In this example, we'll create a Sequential Workflow and add tasks to it. - -```python -from swarms.models import OpenAIChat -from swarms.structs import Agent -from swarms.structs.sequential_workflow import SequentialWorkflow - -# Example usage -api_key = "" # Your actual API key here - -# Initialize the language agent -llm = OpenAIChat( - openai_api_key=api_key, - temperature=0.5, - max_tokens=3000, -) - -# Initialize Agents for individual tasks -flow1 = Agent(llm=llm, max_loops=1, dashboard=False) -flow2 = Agent(llm=llm, max_loops=1, dashboard=False) - -# Create the Sequential Workflow -workflow = SequentialWorkflow(max_loops=1) - -# Add tasks to the workflow -workflow.add("Generate a 10,000 word blog on health and wellness.", flow1) -workflow.add("Summarize the generated blog", flow2) - -# Output the list of tasks in the workflow -print("Tasks in the workflow:") -for task in workflow.tasks: - print(f"Task: {task.description}") -``` - -In this example, we create a Sequential Workflow and add two tasks to it. - -### Example 2: Resetting a Sequential Workflow - -In this example, we'll create a Sequential Workflow, add tasks to it, and then reset it. - -```python -from swarms.models import OpenAIChat -from swarms.structs import Agent -from swarms.structs.sequential_workflow import SequentialWorkflow - -# Example usage -api_key = "" # Your actual API key here - -# Initialize the language agent -llm = OpenAIChat( - openai_api_key=api_key, - temperature=0.5, - max_tokens=3000, -) - -# Initialize Agents for individual tasks -flow1 = Agent(llm=llm, max_loops=1, dashboard=False) -flow2 = Agent(llm=llm, max_loops=1, dashboard=False) - -# Create the Sequential Workflow -workflow = SequentialWorkflow(max_loops=1) - -# Add tasks to the workflow -workflow.add("Generate a 10,000 word blog on health and wellness.", flow1) -workflow.add("Summarize the generated blog", flow2) - -# Reset the workflow -workflow.reset_workflow() - -# Output the list of tasks in the workflow after resetting -print("Tasks in the workflow after resetting:") -for task in workflow.tasks: - print(f"Task: {task.description}") -``` - -In this example, we create a Sequential Workflow, add two tasks to it, and then reset the workflow, clearing all task results. - -### Example 3: Getting Task Results from a Sequential Workflow - -In this example, we'll create a Sequential Workflow, add tasks to it, run the workflow, and then retrieve the results of each task. - -```python -from swarms.models import OpenAIChat -from swarms.structs import Agent -from swarms.structs.sequential_workflow import SequentialWorkflow - -# Example usage -api_key = "" # Your actual API key here - -# Initialize the language agent -llm = OpenAIChat( - openai_api_key=api_key, - temperature=0.5, - max_tokens=3000, -) - -# Initialize Agents for individual tasks -flow1 = Agent(llm=llm, max_loops=1, dashboard=False) -flow2 = Agent(llm=llm, max_loops=1, dashboard=False) - -# Create the Sequential Workflow -workflow = SequentialWorkflow(max_loops=1) - -# Add tasks to the workflow -workflow.add("Generate a 10,000 word blog on health and wellness.", flow1) -workflow.add("Summarize the generated blog", flow2) - -# Run the workflow -workflow.run() - -# Get and display the results of each task in the workflow -results = workflow.get_task_results() -for task_description, result in results.items(): - print(f"Task: {task_description}, Result: {result}") -``` - -In this example, we create a Sequential Workflow, add two tasks to it, run the workflow, and then retrieve and display the results of each task. - -### Example 4: Removing a Task from a Sequential Workflow - -In this example, we'll create a Sequential Workflow, add tasks to it, and then remove a specific task from the workflow. - -```python -from swarms.models import OpenAIChat -from swarms.structs import Agent -from swarms.structs.sequential_workflow import SequentialWorkflow - -# Example usage -api_key = "" # Your actual API key here - -# Initialize the language agent -llm = OpenAIChat( - openai_api_key=api_key, - temperature=0.5, - max_tokens=3000, -) - -# Initialize Agents for individual tasks -flow1 = Agent(llm=llm, max_loops=1, dashboard=False) -flow2 = Agent(llm=llm, max_loops=1, dashboard=False) - -# Create the Sequential Workflow -workflow = SequentialWorkflow(max_loops=1) - -# Add tasks to the workflow -workflow.add("Generate a 10,000 word blog on health and wellness.", flow1) -workflow.add("Summarize the generated blog", flow2) - -# Remove a specific task from the workflow -workflow.remove_task("Generate a 10,000 word blog on health and wellness.") - -# Output the list of tasks in the workflow after removal -print("Tasks in the workflow after removing a task:") -for task in workflow.tasks: - print(f"Task: {task.description}") -``` - -In this example, we create a Sequential Workflow, add two tasks to it, and then remove a specific task from the workflow. - -### Example 5: Updating Task Arguments in a Sequential Workflow - -In this example, we'll create a Sequential Workflow, add tasks to it, and then update the arguments of a specific task in the workflow. - -```python -from swarms.models import OpenAIChat -from swarms.structs import Agent -from swarms.structs.sequential_workflow import SequentialWorkflow - -# Example usage -api_key = ( - "" # Your actual API key here -) - -# Initialize the language agent -llm = OpenAIChat( - openai_api_key=api_key, - temperature=0.5, - max_tokens=3000, -) - -# Initialize Agents for individual tasks -flow1 = Agent(llm=llm, max_loops=1, dashboard=False) -flow2 = Agent(llm=llm, max_loops=1, dashboard=False) - -# Create the Sequential Workflow -workflow = SequentialWorkflow(max_loops=1) - -# Add tasks to the workflow -workflow.add("Generate a 10,000 word blog on health and wellness.", flow1) -workflow.add("Summarize the generated blog", flow2) - -# Update the arguments of a specific task in the workflow -workflow.update_task("Generate a 10,000 word blog on health and wellness.", max_loops=2) - -# Output the list of tasks in the workflow after updating task arguments -print("Tasks in the workflow after updating task arguments:") -for task in workflow.tasks: - print(f"Task: {task.description}, Arguments: { - -task.arguments}") -``` - -In this example, we create a Sequential Workflow, add two tasks to it, and then update the arguments of a specific task in the workflow. - -These examples demonstrate various operations and use cases for working with a Sequential Workflow. - -# Why `SequentialWorkflow`? - -## Enhancing Autonomous Agent Development - -The development of autonomous agents, whether they are conversational AI, robotic systems, or any other AI-driven application, often involves complex workflows that require a sequence of tasks to be executed in a specific order. Managing and orchestrating these tasks efficiently is crucial for building reliable and effective agents. The Sequential Workflow module serves as a valuable tool for AI engineers in achieving this goal. - -## Reliability and Coordination - -One of the primary challenges in autonomous agent development is ensuring that tasks are executed in the correct sequence and that the results of one task can be used as inputs for subsequent tasks. The Sequential Workflow module simplifies this process by allowing AI engineers to define and manage workflows in a structured and organized manner. - -By using the Sequential Workflow module, AI engineers can achieve the following benefits: - -### 1. Improved Reliability - -Reliability is a critical aspect of autonomous agents. The ability to handle errors gracefully and recover from failures is essential for building robust systems. The Sequential Workflow module offers a systematic approach to task execution, making it easier to handle errors, retry failed tasks, and ensure that the agent continues to operate smoothly. - -### 2. Task Coordination - -Coordinating tasks in the correct order is essential for achieving the desired outcome. The Sequential Workflow module enforces task sequencing, ensuring that each task is executed only when its dependencies are satisfied. This eliminates the risk of executing tasks out of order, which can lead to incorrect results. - -### 3. Code Organization - -Managing complex workflows can become challenging without proper organization. The Sequential Workflow module encourages AI engineers to structure their code in a modular and maintainable way. Each task can be encapsulated as a separate unit, making it easier to understand, modify, and extend the agent's behavior. - -### 4. Workflow Visualization - -Visualization is a powerful tool for understanding and debugging workflows. The Sequential Workflow module can be extended to include a visualization dashboard, allowing AI engineers to monitor the progress of tasks, track results, and identify bottlenecks or performance issues. - -## TODO: Future Features - -While the Sequential Workflow module offers significant advantages, there are opportunities for further enhancement. Here is a list of potential features and improvements that can be added to make it even more versatile and adaptable for various AI engineering tasks: - -### 1. Asynchronous Support - -Adding support for asynchronous task execution can improve the efficiency of workflows, especially when dealing with tasks that involve waiting for external events or resources. - -### 2. Context Managers - -Introducing context manager support for tasks can simplify resource management, such as opening and closing files, database connections, or network connections within a task's context. - -### 3. Workflow History - -Maintaining a detailed history of workflow execution, including timestamps, task durations, and input/output data, can facilitate debugging and performance analysis. - -### 4. Parallel Processing - -Enhancing the module to support parallel processing with a pool of workers can significantly speed up the execution of tasks, especially for computationally intensive workflows. - -### 5. Error Handling Strategies - -Providing built-in error handling strategies, such as retries, fallbacks, and custom error handling functions, can make the module more robust in handling unexpected failures. - -## Conclusion - -The Sequential Workflow module is a valuable tool for AI engineers working on autonomous agents and complex AI-driven applications. It offers a structured and reliable approach to defining and executing workflows, ensuring that tasks are performed in the correct sequence. By using this module, AI engineers can enhance the reliability, coordination, and maintainability of their agents. - -As the field of AI continues to evolve, the demand for efficient workflow management tools will only increase. The Sequential Workflow module is a step towards meeting these demands and empowering AI engineers to create more reliable and capable autonomous agents. With future enhancements and features, it has the potential to become an indispensable asset in the AI engineer's toolkit. - -In summary, the Sequential Workflow module provides a foundation for orchestrating complex tasks and workflows, enabling AI engineers to focus on designing intelligent agents that can perform tasks with precision and reliability. - - -## Frequently Asked Questions (FAQs) - -### Q1: What is the difference between a task and a agent in Sequential Workflows? - -**A1:** In Sequential Workflows, a **task** refers to a specific unit of work that needs to be executed. It can be implemented as a callable object, such as a Python function, and is the fundamental building block of a workflow. - -A **agent**, on the other hand, is an encapsulation of a task within the workflow. Agents define the order in which tasks are executed and can be thought of as task containers. They allow you to specify dependencies, error handling, and other workflow-related configurations. - -### Q2: Can I run tasks in parallel within a Sequential Workflow? - -**A2:** Yes, you can run tasks in parallel within a Sequential Workflow by using parallel execution techniques. This advanced feature allows you to execute multiple tasks concurrently, improving performance and efficiency. You can explore this feature further in the guide's section on "Parallel Execution." - -### Q3: How do I handle errors within Sequential Workflows? - -**A3:** Error handling within Sequential Workflows can be implemented by adding error-handling logic within your task functions. You can catch exceptions and handle errors gracefully, ensuring that your workflow can recover from unexpected scenarios. The guide also covers more advanced error handling strategies, such as retrying failed tasks and handling specific error types. - -### Q4: What are some real-world use cases for Sequential Workflows? - -**A4:** Sequential Workflows can be applied to a wide range of real-world use cases, including: - -- **Data ETL (Extract, Transform, Load) Processes:** Automating data pipelines that involve data extraction, transformation, and loading into databases or data warehouses. - -- **Batch Processing:** Running batch jobs that process large volumes of data or perform data analysis. - -- **Automation of DevOps Tasks:** Streamlining DevOps processes such as deployment, provisioning, and monitoring. - -- **Cross-system Integrations:** Automating interactions between different systems, services, or APIs. - -- **Report Generation:** Generating reports and documents automatically based on data inputs. - -- **Workflow Orchestration:** Orchestrating complex workflows involving multiple steps and dependencies. - -- **Resource Provisioning:** Automatically provisioning and managing cloud resources. - -These are just a few examples, and Sequential Workflows can be tailored to various automation needs across industries. +- The `max_loops` parameter can be used to control how many times the workflow should be executed, which is useful for iterative processes. +- Utilize the logging information to monitor and debug the task execution process. diff --git a/example.py b/example.py index 98f95742..c8319346 100644 --- a/example.py +++ b/example.py @@ -1,4 +1,5 @@ -from swarms import Agent, OpenAIChat +from swarms import Agent +from langchain_community.llms.anthropic import Anthropic def calculate_profit(revenue: float, expenses: float): @@ -29,21 +30,59 @@ def generate_report(company_name: str, profit: float): return f"The profit for {company_name} is ${profit}." +EMAIL_DETECT_APPOINT = """ + +if the user gives you an email address, then call the appointment function to schedule a meeting with the user. + +SCHEMA OF THE FUNCTION: + + +""" + + +def write_memory_to_rag(memory_name: str, memory: str): + """ + Writes the memory to the RAG model for fine-tuning. + + Args: + memory_name (str): The name of the memory. + memory (str): The memory to be written to the RAG model. + """ + # Write the memory to the RAG model for fine-tuning + from playground.memory.chromadb_example import ChromaDB + + db = ChromaDB(output_dir=memory_name) + + db.add(memory) + + return None + + # Initialize the agent agent = Agent( agent_name="Accounting Assistant", system_prompt="You're the accounting agent, your purpose is to generate a profit report for a company!", agent_description="Generate a profit report for a company!", - llm=OpenAIChat(), - max_loops=1, + llm=Anthropic(), + max_loops="auto", autosave=True, + sop_list=[EMAIL_DETECT_APPOINT], # dynamic_temperature_enabled=True, dashboard=False, verbose=True, streaming_on=True, # interactive=True, # Set to False to disable interactive mode saved_state_path="accounting_agent.json", - tools=[calculate_profit, generate_report], + # tools=[ + # # calculate_profit, + # # generate_report, + # # search_knowledge_base, + # # write_memory_to_rag, + # # search_knowledge_base, + # # generate_speech, + # ], + stopping_token="Stop!", + interactive=True, # docs_folder="docs", # pdf_path="docs/accounting_agent.pdf", # sop="Calculate the profit for a company.", @@ -55,9 +94,11 @@ agent = Agent( # context_length=1000, # tool_schema = dict context_length=1000, + # agent_ops_on=True, + # tree_of_thoughts=True, # long_term_memory=ChromaDB(docs_folder="artifacts"), ) agent.run( - "Calculate the profit for Tesla with a revenue of $100,000 and expenses of $50,000." + "Search the knowledge base for the swarms github framework and how it works" ) diff --git a/scripts/cleanup/json_log_cleanup.py b/json_log_cleanup.py similarity index 97% rename from scripts/cleanup/json_log_cleanup.py rename to json_log_cleanup.py index 2fb4c889..c2704d81 100644 --- a/scripts/cleanup/json_log_cleanup.py +++ b/json_log_cleanup.py @@ -41,7 +41,7 @@ def cleanup_json_logs(name: str = None): if os.path.exists(ruff_cache): shutil.rmtree(ruff_cache) logger.info(f"Deleted Ruff cache at {ruff_cache}") - + if os.path.exists(dist_cache): shutil.rmtree(dist_cache) logger.info(f"Deleted the dist cache at {dist_cache}") @@ -58,4 +58,4 @@ def cleanup_json_logs(name: str = None): # Call the function -cleanup_json_logs("artifactss_23") +cleanup_json_logs("sequential_workflow_agents") diff --git a/playground/tools/func_calling_schema.py b/playground/tools/func_calling_schema.py new file mode 100644 index 00000000..da0ccc13 --- /dev/null +++ b/playground/tools/func_calling_schema.py @@ -0,0 +1,13 @@ +import json +from swarms.tools.py_func_to_openai_func_str import ( + get_openai_function_schema_from_func, +) +from swarms.tools.prebuilt.bing_api import fetch_web_articles_bing_api + +out = get_openai_function_schema_from_func( + fetch_web_articles_bing_api, + name="fetch_web_articles_bing_api", + description="Fetches four articles from Bing Web Search API based on the given query.", +) +out = json.dumps(out, indent=2) +print(out) diff --git a/pyproject.toml b/pyproject.toml index 04d5eaf7..4720fe41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "5.2.8" +version = "5.3.2" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/artifacts/__init__.py b/swarms/artifacts/__init__.py index be270761..020887c6 100644 --- a/swarms/artifacts/__init__.py +++ b/swarms/artifacts/__init__.py @@ -1,9 +1,10 @@ from swarms.artifacts.base_artifact import BaseArtifact from swarms.artifacts.text_artifact import TextArtifact -from swarms.artifacts.artifact_main import Artifact + +# from swarms.artifacts.claude_like_artifact import Artifact __all__ = [ "BaseArtifact", "TextArtifact", - "Artifact", + # "Artifact", ] diff --git a/swarms/cli/__init__.py b/swarms/cli/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/swarms/cli/main.py b/swarms/cli/main.py new file mode 100644 index 00000000..f80f07f8 --- /dev/null +++ b/swarms/cli/main.py @@ -0,0 +1,91 @@ +import argparse +from swarms.cli.parse_yaml import ( + create_agent_from_yaml, + run_agent, + list_agents, +) + +SWARMS_LOGO = """ + _________ + / _____/_ _ _______ _______ _____ ______ + \_____ \\ \/ \/ /\__ \\_ __ \/ \ / ___/ + / \\ / / __ \| | \/ Y Y \\___ \ +/_______ / \/\_/ (____ /__| |__|_| /____ > + \/ \/ \/ \/ +""" + +RED_COLOR_CODE = "\033[91m" +RESET_COLOR_CODE = "\033[0m" + +# print(RED_COLOR_CODE + SWARMS_LOGO + RESET_COLOR_CODE) + + +def main(): + parser = argparse.ArgumentParser( + description=f""" + + {SWARMS_LOGO} + CLI for managing and running swarms agents. + + """ + ) + subparsers = parser.add_subparsers( + dest="command", help="Available commands" + ) + + # create agent command + create_parser = subparsers.add_parser( + "create", help="Create a new agent from a YAML file" + ) + create_parser.add_argument( + "agent", type=str, help="Path to the YAML file" + ) + + # run agent command + run_parser = subparsers.add_parser( + "run", help="Run an agent with a specified task" + ) + run_parser.add_argument( + "agent_name", type=str, help="Name of the agent to run" + ) + run_parser.add_argument( + "task", type=str, help="Task for the agent to execute" + ) + + # list agents command + subparsers.add_parser("list", help="List all agents") + + # Additional help options + parser.add_argument( + "--issue", + action="store_true", + help="Open an issue on GitHub: https://github.com/kyegomez/swarms/issues/new/choose", + ) + parser.add_argument( + "--community", + action="store_true", + help="Join our community on Discord: https://discord.com/servers/agora-999382051935506503", + ) + + args = parser.parse_args() + + if args.issue: + print( + "Open an issue on GitHub: https://github.com/kyegomez/swarms/issues/new/choose" + ) + elif args.community: + print( + "Join our community on Discord: https://discord.com/servers/agora-999382051935506503" + ) + elif args.command == "create": + create_agent_from_yaml(args.agent) + elif args.command == "run": + run_agent(args.agent_name, args.task) + elif args.command == "list agents": + list_agents() + else: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/swarms/cli/parse_yaml.py b/swarms/cli/parse_yaml.py new file mode 100644 index 00000000..e7ba841f --- /dev/null +++ b/swarms/cli/parse_yaml.py @@ -0,0 +1,120 @@ +from swarms.utils.loguru_logger import logger +import yaml +from pydantic import BaseModel +from typing import List, Optional +import json +from swarms.structs.agent_registry import AgentRegistry +from swarms.structs.agent import Agent +from swarms.models.popular_llms import OpenAIChat + + +class AgentInput(BaseModel): + agent_name: str = "Swarm Agent" + system_prompt: Optional[str] = None + agent_description: Optional[str] = None + model_name: str = "OpenAIChat" + max_loops: int = 1 + autosave: bool = False + dynamic_temperature_enabled: bool = False + dashboard: bool = False + verbose: bool = False + streaming_on: bool = True + saved_state_path: Optional[str] = None + sop: Optional[str] = None + sop_list: Optional[List[str]] = None + user_name: str = "User" + retry_attempts: int = 3 + context_length: int = 8192 + task: Optional[str] = None + interactive: bool = False + + +def parse_yaml_to_json(yaml_str: str) -> str: + """ + Parses the given YAML string into an AgentInput model and converts it to a JSON string. + + Args: + yaml_str (str): The YAML string to be parsed. + + Returns: + str: The JSON string representation of the parsed YAML. + + Raises: + ValueError: If the YAML string cannot be parsed into the AgentInput model. + """ + try: + data = yaml.safe_load(yaml_str) + agent_input = AgentInput(**data) + return agent_input.json() + except yaml.YAMLError as e: + print(f"YAML Error: {e}") + raise ValueError("Invalid YAML input.") from e + except ValueError as e: + print(f"Validation Error: {e}") + raise ValueError("Invalid data for AgentInput model.") from e + + +# # Example usage +# yaml_input = """ +# agent_name: "Custom Agent" +# system_prompt: "System prompt example" +# agent_description: "This is a test agent" +# model_name: "CustomModel" +# max_loops: 5 +# autosave: true +# dynamic_temperature_enabled: true +# dashboard: true +# verbose: true +# streaming_on: false +# saved_state_path: "/path/to/state" +# sop: "Standard operating procedure" +# sop_list: ["step1", "step2"] +# user_name: "Tester" +# retry_attempts: 5 +# context_length: 4096 +# task: "Perform testing" +# """ + +# json_output = parse_yaml_to_json(yaml_input) +# print(json_output) + +registry = AgentRegistry() + + +def create_agent_from_yaml(yaml_path: str) -> None: + with open(yaml_path, "r") as file: + yaml_str = file.read() + agent_json = parse_yaml_to_json(yaml_str) + agent_config = json.loads(agent_json) + + agent = Agent( + agent_name=agent_config.get("agent_name", "Swarm Agent"), + system_prompt=agent_config.get("system_prompt"), + agent_description=agent_config.get("agent_description"), + llm=OpenAIChat(), + max_loops=agent_config.get("max_loops", 1), + autosave=agent_config.get("autosave", False), + dynamic_temperature_enabled=agent_config.get( + "dynamic_temperature_enabled", False + ), + dashboard=agent_config.get("dashboard", False), + verbose=agent_config.get("verbose", False), + streaming_on=agent_config.get("streaming_on", True), + saved_state_path=agent_config.get("saved_state_path"), + retry_attempts=agent_config.get("retry_attempts", 3), + context_length=agent_config.get("context_length", 8192), + ) + + registry.add(agent.agent_name, agent) + logger.info(f"Agent {agent.agent_name} created from {yaml_path}.") + + +def run_agent(agent_name: str, task: str) -> None: + agent = registry.find_agent_by_name(agent_name) + agent.run(task) + + +def list_agents() -> None: + agents = registry.list_agents() + for agent_id in agents: + print(agent_id) diff --git a/swarms/structs/agent_registry.py b/swarms/structs/agent_registry.py new file mode 100644 index 00000000..7d4ed5f2 --- /dev/null +++ b/swarms/structs/agent_registry.py @@ -0,0 +1,176 @@ +from typing import List, Dict, Optional, Callable +from pydantic import BaseModel, ValidationError +from threading import Lock +from swarms import Agent +from swarms.utils.loguru_logger import logger +from swarms.utils.report_error_loguru import report_error + + +class AgentModel(BaseModel): + """ + Pydantic model for an Agent. + """ + + agent_id: str + agent: Agent + + +class AgentRegistry: + """ + A registry for managing agents, with methods to add, delete, update, and query agents. + """ + + def __init__(self): + self.agents: Dict[str, AgentModel] = {} + self.lock = Lock() + + def add(self, agent_id: str, agent: Agent) -> None: + """ + Adds a new agent to the registry. + + Args: + agent_id (str): The unique identifier for the agent. + agent (Agent): The agent to add. + + Raises: + ValueError: If the agent_id already exists in the registry. + ValidationError: If the input data is invalid. + """ + with self.lock: + if agent_id in self.agents: + logger.error(f"Agent with id {agent_id} already exists.") + raise ValueError( + f"Agent with id {agent_id} already exists." + ) + try: + self.agents[agent_id] = AgentModel( + agent_id=agent_id, agent=agent + ) + logger.info(f"Agent {agent_id} added successfully.") + except ValidationError as e: + logger.error(f"Validation error: {e}") + raise + + def delete(self, agent_id: str) -> None: + """ + Deletes an agent from the registry. + + Args: + agent_id (str): The unique identifier for the agent to delete. + + Raises: + KeyError: If the agent_id does not exist in the registry. + """ + with self.lock: + try: + del self.agents[agent_id] + logger.info(f"Agent {agent_id} deleted successfully.") + except KeyError as e: + logger.error(f"Error: {e}") + raise + + def update_agent(self, agent_id: str, new_agent: Agent) -> None: + """ + Updates an existing agent in the registry. + + Args: + agent_id (str): The unique identifier for the agent to update. + new_agent (Agent): The new agent to replace the existing one. + + Raises: + KeyError: If the agent_id does not exist in the registry. + ValidationError: If the input data is invalid. + """ + with self.lock: + if agent_id not in self.agents: + logger.error(f"Agent with id {agent_id} does not exist.") + raise KeyError(f"Agent with id {agent_id} does not exist.") + try: + self.agents[agent_id] = AgentModel( + agent_id=agent_id, agent=new_agent + ) + logger.info(f"Agent {agent_id} updated successfully.") + except ValidationError as e: + logger.error(f"Validation error: {e}") + raise + + def get(self, agent_id: str) -> Agent: + """ + Retrieves an agent from the registry. + + Args: + agent_id (str): The unique identifier for the agent to retrieve. + + Returns: + Agent: The agent associated with the given agent_id. + + Raises: + KeyError: If the agent_id does not exist in the registry. + """ + with self.lock: + try: + agent = self.agents[agent_id].agent + logger.info(f"Agent {agent_id} retrieved successfully.") + return agent + except KeyError as e: + logger.error(f"Error: {e}") + raise + + def list_agents(self) -> List[str]: + """ + Lists all agent identifiers in the registry. + + Returns: + List[str]: A list of all agent identifiers. + """ + try: + with self.lock: + agent_ids = list(self.agents.keys()) + logger.info("Listing all agents.") + return agent_ids + except Exception as e: + report_error(e) + raise e + + def query( + self, condition: Optional[Callable[[Agent], bool]] = None + ) -> List[Agent]: + """ + Queries agents based on a condition. + + Args: + condition (Optional[Callable[[Agent], bool]]): A function that takes an agent and returns a boolean indicating + whether the agent meets the condition. + + Returns: + List[Agent]: A list of agents that meet the condition. + """ + try: + with self.lock: + if condition is None: + agents = [ + agent_model.agent + for agent_model in self.agents.values() + ] + logger.info("Querying all agents.") + return agents + + agents = [ + agent_model.agent + for agent_model in self.agents.values() + if condition(agent_model.agent) + ] + logger.info("Querying agents with condition.") + return agents + except Exception as e: + report_error(e) + raise e + + def find_agent_by_name(self, agent_name: str) -> Agent: + try: + for agent_model in self.agents.values(): + if agent_model.agent.agent_name == agent_name: + return agent_model.agent + except Exception as e: + report_error(e) + raise e diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index 61936f6d..1212d10d 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -1,164 +1,115 @@ -import concurrent.futures -from dataclasses import dataclass, field -from typing import Callable, Dict, List, Optional - -from swarms.structs.task import Task -from swarms.utils.logger import logger +import threading +from queue import Queue +from typing import List from swarms.structs.agent import Agent -from swarms.structs.base_workflow import BaseWorkflow - - -@dataclass -class ConcurrentWorkflow(BaseWorkflow): - """ - ConcurrentWorkflow class for running a set of tasks concurrently using N number of autonomous agents. - - Args: - max_workers (int): The maximum number of workers to use for the ThreadPoolExecutor. - autosave (bool): Whether to save the state of the workflow to a file. Default is False. - saved_state_filepath (str): The filepath to save the state of the workflow to. Default is "runs/concurrent_workflow.json". - print_results (bool): Whether to print the results of each task. Default is False. - return_results (bool): Whether to return the results of each task. Default is False. - use_processes (bool): Whether to use processes instead of threads. Default is False. - - Examples: - >>> from swarms.models import OpenAIChat - >>> from swarms.structs import ConcurrentWorkflow - >>> llm = OpenAIChat(openai_api_key="") - >>> workflow = ConcurrentWorkflow(max_workers=5) - >>> workflow.add("What's the weather in miami", llm) - >>> workflow.add("Create a report on these metrics", llm) - >>> workflow.run() - >>> workflow.tasks - """ - - task_pool: List[Dict] = field(default_factory=list) - max_loops: int = 1 - max_workers: int = 5 - autosave: bool = False - agents: List[Agent] = None - saved_state_filepath: Optional[str] = "runs/concurrent_workflow.json" - print_results: bool = False - return_results: bool = False - use_processes: bool = False - stopping_condition: Optional[Callable] = None - - def add( - self, - task: Task = None, - agent: Agent = None, - tasks: List[Task] = None, - ): - """Adds a task to the workflow. +from swarms.utils.loguru_logger import logger +from dotenv import load_dotenv +import os +from swarms.models.popular_llms import OpenAIChat + + +class ConcurrentWorkflow: + def __init__(self, agents: List[Agent], max_loops: int): + """ + Initializes the ConcurrentWorkflow with the given parameters. Args: - task (Task): _description_ - tasks (List[Task]): _description_ + agents (List[Agent]): The list of agents to initialize. + max_loops (int): The maximum number of loops each agent can run. + """ + self.max_loops = max_loops + self.agents = agents + self.num_agents = len(agents) + self.output_queue = Queue() + + def run_agent(self, agent: Agent, task: str) -> None: + """ + Runs a given agent on the specified task once. + + Args: + agent (Agent): The agent to run. + task (str): The task for the agent to execute. """ try: - if tasks: - for task in tasks: - self.task_pool.append(task) - logger.info( - f"Added task {task} to ConcurrentWorkflow." - ) + logger.info(f"Running agent {agent} on task '{task}'") + result = agent.run(task) + logger.info( + f"Agent {agent} completed task with result: {result}" + ) + + if result is None: + raise ValueError("Received None as result") + + self.output_queue.put(result) + except Exception as e: + logger.error(f"Error running agent {agent}: {e}") + self.output_queue.put(f"Error: {e}") + + def process_agent_outputs(self, task: str) -> None: + """ + Processes outputs from agents and conditionally sends them to other agents. + + Args: + task (str): The task for the agents to execute. + """ + while not self.output_queue.empty(): + result = self.output_queue.get() + if isinstance(result, str) and result.startswith("Error:"): + logger.error(result) else: - if task: - self.task_pool.append(task) - logger.info( - f"Added task {task} to ConcurrentWorkflow." - ) - - if agent: - self.agents.append(agent) - logger.info(f"Added agent {agent} to ConcurrentWorkflow.") - except Exception as error: - logger.warning(f"[ERROR][ConcurrentWorkflow] {error}") - raise error - - def run(self, task: str = None, *args, **kwargs): + logger.info(f"Processing result: {result}") + for next_agent in self.agents: + self.run_agent(next_agent, task) + + def run(self, task: str) -> str: """ - Executes the tasks in parallel using a ThreadPoolExecutor. + Runs a list of agents concurrently on the same task using threads. Args: - print_results (bool): Whether to print the results of each task. Default is False. - return_results (bool): Whether to return the results of each task. Default is False. + task (str): The task for the agents to execute. Returns: - List[Any]: A list of the results of each task, if return_results is True. Otherwise, returns None. + str: The final result of the concurrent execution. """ - loop = 0 - while loop < self.max_loops: - - if self.tasks is not None: - with concurrent.futures.ThreadPoolExecutor( - max_workers=self.max_workers - ) as executor: - futures = { - executor.submit(task.execute): task - for task in self.task_pool - } - results = [] - - for future in concurrent.futures.as_completed(futures): - task = futures[future] - try: - result = future.result() - if self.print_results: - logger.info(f"Task {task}: {result}") - if self.return_results: - results.append(result) - except Exception as e: - logger.error( - f"Task {task} generated an exception: {e}" - ) - - loop += 1 - if self.stopping_condition and self.stopping_condition( - results - ): - break - - elif self.agents is not None: - with concurrent.futures.ThreadPoolExecutor( - max_workers=self.max_workers - ) as executor: - futures = { - executor.submit(agent.run): agent - for agent in self.agents - } - results = [] - - for future in concurrent.futures.as_completed(futures): - agent = futures[future] - try: - result = future.result() - if self.print_results: - logger.info(f"Agent {agent}: {result}") - if self.return_results: - results.append(result) - except Exception as e: - logger.error( - f"Agent {agent} generated an exception: {e}" - ) - - loop += 1 - if self.stopping_condition and self.stopping_condition( - results - ): - break + threads = [] - else: - logger.warning("No tasks or agents found in the workflow.") - break + try: + for agent in self.agents: + thread = threading.Thread( + target=self.run_agent, args=(agent, task) + ) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + + # self.process_agent_outputs(task) + except Exception as e: + logger.error(f"Error in concurrent workflow: {e}") + + return None + + +# Load the environment variables +load_dotenv() + +# Get the API key from the environment +api_key = os.environ.get("OPENAI_API_KEY") + +# Initialize the language model (assuming this should be done outside the class and passed to it) +llm = OpenAIChat(temperature=0.5, openai_api_key=api_key, max_tokens=4000) - return results if self.return_results else None +# Initialize agents +agents = [ + Agent(llm=llm, max_loops=1, autosave=True, dashboard=True) + for _ in range(1) +] - def list_tasks(self): - """Prints a list of the tasks in the workflow.""" - for task in self.task_pool: - logger.info(task) +# Task to be executed by each agent +task = "Generate a 10,000 word blog on health and wellness." - def save(self): - """Saves the state of the workflow to a file.""" - self.save_state(self.saved_state_filepath) +# Initialize and run the ConcurrentWorkflow +workflow = ConcurrentWorkflow(agents=agents, max_loops=1) +result = workflow.run(task) +logger.info(f"Final result: {result}") diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index 4576ffa1..48ebc20a 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -1,91 +1,46 @@ -import time -import json - +from typing import List +from swarms import Agent from swarms.utils.loguru_logger import logger -from swarms.structs.base_workflow import BaseWorkflow -from pydantic import BaseModel, Field -from typing import List, Dict -from swarms.structs.agent import Agent +from swarms.structs.rearrange import AgentRearrange -class StepSequentialWorkflow(BaseModel): - agent_names: List[str] = Field( - ..., description="List of agent names to include in the workflow." - ) - max_loops: int = Field( - 1, description="Maximum number of loops to run the workflow." - ) - verbose: bool = Field( - False, description="Whether to log debug information." - ) - steps: Dict = Field( - ..., - description="Dictionary of steps for the workflow with each agent and its parameters.", - ) - time: str = Field( - time.strftime("%Y-%m-%d %H:%M:%S"), - description="Time of the workflow.", - ) +class SequentialWorkflow: + """ + Initializes a SequentialWorkflow object. + Args: + agents (List[Agent], optional): The list of agents in the workflow. Defaults to None. + max_loops (int, optional): The maximum number of loops to execute the workflow. Defaults to 1. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + """ -# Define a class to handle the sequential workflow -class SequentialWorkflow(BaseWorkflow): def __init__( self, agents: List[Agent] = None, - max_loops: int = 2, - verbose: bool = False, + max_loops: int = 1, *args, **kwargs, ): - """ - Initializes a SequentialWorkflow with a list of agents. - - :param agents: List of agents to include in the workflow. - """ self.agents = agents - self.max_loops = max_loops - - if verbose: - logger.add("sequential_workflow.log", level="DEBUG") - - if not self.agents: - raise ValueError("No agents provided for workflow") - - if not self.max_loops: - self.max_loops = 1 - - # Log all the agents in the workflow - logger.info( - f"Initialized SequentialWorkflow with agents: {json.dumps([str(agent.agent_name) for agent in self.agents])}" + self.flow = " -> ".join(agent.agent_name for agent in agents) + self.agent_rearrange = AgentRearrange( + agents, self.flow, max_loops=max_loops, *args, **kwargs ) - def run(self, task: str, *args, **kwargs): + def run(self, task: str) -> str: """ - Run the workflow starting with an initial task. + Runs the task through the agents in the dynamically constructed flow. + + Args: + task (str): The task for the agents to execute. - :param task: The task to start the workflow. + Returns: + str: The final result after processing through all agents. """ - logger.info(f"Starting workflow with task: {task}") - current_output = task - for agent in self.agents: - count = 0 - while count < self.max_loops: - try: - logger.info(f"Running agent {agent.agent_name}") - current_output = agent.run( - current_output, *args, **kwargs - ) - print(current_output) - count += 1 - logger.debug( - f"Agent {agent.agent_name} completed loop {count} " - ) # Log partial output for brevity - except Exception as e: - logger.error( - f"Error occurred while running agent {agent.agent_name}: {str(e)}" - ) - raise - logger.info(f"Finished running agent {agent.agent_name}") - logger.info("Finished running workflow") - return current_output + try: + logger.info(f"Running task with dynamic flow: {self.flow}") + return self.agent_rearrange.run(task) + except Exception as e: + logger.error(f"An error occurred while running the task: {e}") + raise diff --git a/swarms/tools/py_func_to_openai_func_str.py b/swarms/tools/py_func_to_openai_func_str.py index 347cf44b..4da36e31 100644 --- a/swarms/tools/py_func_to_openai_func_str.py +++ b/swarms/tools/py_func_to_openai_func_str.py @@ -372,7 +372,7 @@ def get_openai_function_schema_from_func( function: Callable[..., Any], *, name: Optional[str] = None, - description: str, + description: str = None, ) -> Dict[str, Any]: """Get a JSON schema for a function as defined by the OpenAI API diff --git a/swarms/utils/successful_run.py b/swarms/utils/successful_run.py new file mode 100644 index 00000000..eac261da --- /dev/null +++ b/swarms/utils/successful_run.py @@ -0,0 +1,73 @@ +from loguru import logger +import sys +import platform +import os +import datetime + +# Configuring loguru to log to both the console and a file +logger.remove() # Remove default logger configuration +logger.add( + sys.stderr, + level="INFO", + format="{time} - {level} - {message}", +) + +logger.add("info.log", level="INFO", format="{time} - {level} - {message}") + + +def log_success_message() -> None: + """ + Logs a success message with instructions for sharing agents on the Swarms Agent Explorer and joining the community for assistance. + + Returns: + None + + Raises: + None + """ + # Gather extensive context information + context_info = { + "timestamp": datetime.datetime.now().isoformat(), + "python_version": platform.python_version(), + "platform": platform.platform(), + "machine": platform.machine(), + "processor": platform.processor(), + "user": os.getenv("USER") or os.getenv("USERNAME"), + "current_working_directory": os.getcwd(), + } + + success_message = ( + f"\n" + f"#########################################\n" + f"# #\n" + f"# SUCCESSFUL RUN DETECTED! #\n" + f"# #\n" + f"#########################################\n" + f"\n" + f"Your task completed successfully!\n" + f"\n" + f"Context Information:\n" + f"-----------------------------------------\n" + f"Timestamp: {context_info['timestamp']}\n" + f"Python Version: {context_info['python_version']}\n" + f"Platform: {context_info['platform']}\n" + f"Machine: {context_info['machine']}\n" + f"Processor: {context_info['processor']}\n" + f"User: {context_info['user']}\n" + f"Current Working Directory: {context_info['current_working_directory']}\n" + f"-----------------------------------------\n" + f"\n" + f"Share your agents on the Swarms Agent Explorer with friends:\n" + f"https://swarms.world/platform/explorer\n" + f"\n" + f"Join the Swarms community if you want assistance or help debugging:\n" + f"https://discord.gg/uzu63HQx\n" + f"\n" + f"#########################################\n" + ) + + logger.info(success_message) + + +# Example usage: +log_success_message()