From 5728bff63a4e423e51f08c50d46a0fb804bc424d Mon Sep 17 00:00:00 2001 From: Kye Date: Mon, 22 Apr 2024 20:54:22 -0400 Subject: [PATCH] [FEAT][Agent][Structured Outputs] --- .../agent_with_basemodel_output_type.py | 54 +++ playground/notest.txt | 98 ++++++ playground/swarms/auto_swarm_example.py | 7 + playground/swarms/build_a_swarm.py | 2 +- playground/swarms/swarm_example.py | 2 +- pyproject.toml | 13 +- requirements.txt | 1 + swarms/models/__init__.py | 3 - swarms/models/popular_llms.py | 18 +- swarms/structs/__init__.py | 3 - swarms/structs/agent.py | 203 +++++++++-- swarms/structs/agent_process.py | 4 +- swarms/structs/auto_swarm.py | 64 ---- swarms/structs/autoscaler.py | 329 ------------------ swarms/structs/base_structure.py | 27 +- swarms/structs/base_swarm.py | 108 ++---- swarms/structs/blockdevice.py | 25 -- .../structs/{blocksdict.py => blocks_dict.py} | 0 .../structs/{blockslist.py => blocks_list.py} | 0 swarms/structs/conversation.py | 6 +- swarms/structs/groupchat.py | 7 +- swarms/structs/load_balancer.py | 70 ---- swarms/structs/message.py | 18 +- swarms/structs/multi_agent_collab.py | 66 +--- swarms/structs/omni_agent_types.py | 4 +- swarms/structs/schemas.py | 24 +- swarms/structs/sequential_workflow.py | 30 -- swarms/structs/swarm_redis_registry.py | 176 ---------- swarms/tools/__init__.py | 12 + swarms/tools/pydantic_to_json.py | 134 +++++++ 30 files changed, 596 insertions(+), 912 deletions(-) create mode 100644 playground/agents/agent_with_basemodel_output_type.py create mode 100644 playground/notest.txt create mode 100644 playground/swarms/auto_swarm_example.py delete mode 100644 swarms/structs/autoscaler.py delete mode 100644 swarms/structs/blockdevice.py rename swarms/structs/{blocksdict.py => blocks_dict.py} (100%) rename swarms/structs/{blockslist.py => blocks_list.py} (100%) delete mode 100644 swarms/structs/load_balancer.py delete mode 100644 swarms/structs/swarm_redis_registry.py create mode 100644 swarms/tools/pydantic_to_json.py diff --git a/playground/agents/agent_with_basemodel_output_type.py b/playground/agents/agent_with_basemodel_output_type.py new file mode 100644 index 00000000..541211bd --- /dev/null +++ b/playground/agents/agent_with_basemodel_output_type.py @@ -0,0 +1,54 @@ +from pydantic import BaseModel, Field +from swarms import Anthropic +from swarms import Agent + + +# Initialize the schema for the person's information +class Schema(BaseModel): + name: str = Field(..., title="Name of the person") + agent: int = Field(..., title="Age of the person") + is_student: bool = Field(..., title="Whether the person is a student") + courses: list[str] = Field( + ..., title="List of courses the person is taking" + ) + +# Convert the schema to a JSON string +tool_schema = Schema( + name="Tool Name", + agent=1, + is_student=True, + courses=["Course1", "Course2"], +) + +# Define the task to generate a person's information +task = "Generate a person's information based on the following schema:" + +# Initialize the agent +agent = Agent( + agent_name="Person Information Generator", + system_prompt=( + "Generate a person's information based on the following schema:" + ), + # Set the tool schema to the JSON string -- this is the key difference + tool_schema=tool_schema, + llm=Anthropic(), + max_loops=3, + autosave=True, + dashboard=False, + streaming_on=True, + verbose=True, + interactive=True, + # Set the output type to the tool schema which is a BaseModel + output_type=tool_schema, # or dict, or str + metadata_output_type="json", + # List of schemas that the agent can handle + list_tool_schemas = [tool_schema], + function_calling_format_type = "OpenAI", + function_calling_type = "json" # or soon yaml +) + +# Run the agent to generate the person's information +generated_data = agent.run(task) + +# Print the generated data +print(f"Generated data: {generated_data}") diff --git a/playground/notest.txt b/playground/notest.txt new file mode 100644 index 00000000..62894bab --- /dev/null +++ b/playground/notest.txt @@ -0,0 +1,98 @@ +swarms + +pip install swarms +swarms is the most pythonic way of writing cognitive systems. Leveraging pydantic models as output schemas combined with langchain in the backend allows for a seamless integration of llms into your apps. It utilizes OpenAI Functions or LlamaCpp grammars (json-schema-mode) for efficient structured output. In the backend it compiles the swarms syntax into langchain runnables so you can easily invoke, stream or batch process your pipelines. + +Open in GitHub Codespaces + +from pydantic import BaseModel, Field +from swarms import Anthropic +from swarms import Agent + + +# Initialize the schema for the person's information +class Schema(BaseModel): + name: str = Field(..., title="Name of the person") + agent: int = Field(..., title="Age of the person") + is_student: bool = Field(..., title="Whether the person is a student") + courses: list[str] = Field( + ..., title="List of courses the person is taking" + ) + +# Convert the schema to a JSON string +tool_schema = Schema( + name="Tool Name", + agent=1, + is_student=True, + courses=["Course1", "Course2"], +) + +# Define the task to generate a person's information +task = "Generate a person's information based on the following schema:" + +# Initialize the agent +agent = Agent( + agent_name="Person Information Generator", + system_prompt=( + "Generate a person's information based on the following schema:" + ), + # Set the tool schema to the JSON string -- this is the key difference + tool_schema=tool_schema, + llm=Anthropic(), + max_loops=3, + autosave=True, + dashboard=False, + streaming_on=True, + verbose=True, + interactive=True, + # Set the output type to the tool schema which is a BaseModel + output_type=tool_schema, # or dict, or str + metadata_output_type="json", + # List of schemas that the agent can handle + list_tool_schemas = [tool_schema], + function_calling_format_type = "OpenAI", + function_calling_type = "json" # or soon yaml +) + +# Run the agent to generate the person's information +generated_data = agent.run(task) + +# Print the generated data +print(f"Generated data: {generated_data}") + + + + + + +Features +🐍 pythonic +🔀 easy swap between openai or local models +🔄 dynamic output types (pydantic models, or primitives) +👁️ vision llm support +🧠 langchain_core as backend +📝 jinja templating for prompts +🏗️ reliable structured output +🔁 auto retry parsing +🔧 langsmith support +🔄 sync, async, streaming, parallel, fallbacks +📦 gguf download from huggingface +✅ type hints for all functions and mypy support +🗣️ chat router component +🧩 composable with langchain LCEL +🛠️ easy error handling +🚦 enums and literal support +📐 custom parsing types +Documentation +Checkout the docs here 👈 + +Also highly recommend to try and run the examples in the ./examples folder. + +Contribution +You want to contribute? Thanks, that's great! For more information checkout the Contributing Guide. Please run the dev setup to get started: + +git clone https://github.com/kyegomez/swarms.git && cd swarms + +./dev_setup.sh +About +⛓️ build cognitive systems, pythonic diff --git a/playground/swarms/auto_swarm_example.py b/playground/swarms/auto_swarm_example.py new file mode 100644 index 00000000..049d512c --- /dev/null +++ b/playground/swarms/auto_swarm_example.py @@ -0,0 +1,7 @@ +from swarms import AutoSwarm + +# Initialize the swarm +swarm = AutoSwarm("kyegomez/myswarm") + +# Run the swarm +swarm.run("Generate a 10,000 word blog on health and wellness.") diff --git a/playground/swarms/build_a_swarm.py b/playground/swarms/build_a_swarm.py index b28ba146..c17469ed 100644 --- a/playground/swarms/build_a_swarm.py +++ b/playground/swarms/build_a_swarm.py @@ -1,4 +1,4 @@ -from swarms import BaseSwarm, AutoSwarm, AutoSwarmRouter, Agent, Anthropic +from swarms import BaseSwarm, Agent, Anthropic class MarketingSwarm(BaseSwarm): diff --git a/playground/swarms/swarm_example.py b/playground/swarms/swarm_example.py index d170c5b2..e6f5980b 100644 --- a/playground/swarms/swarm_example.py +++ b/playground/swarms/swarm_example.py @@ -1,4 +1,4 @@ -from swarms import AutoSwarm, AutoSwarmRouter, BaseSwarm, Agent, Anthropic +from swarms import BaseSwarm, Agent, Anthropic class MySwarm(BaseSwarm): diff --git a/pyproject.toml b/pyproject.toml index 7154b0fc..467feb1b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "4.8.4" +version = "4.8.6" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] @@ -51,6 +51,7 @@ python-dotenv = "*" accelerate = "0.28.0" opencv-python = "^4.9.0.80" yaml = "*" +docstring_parser = "0.16" [tool.poetry.group.lint.dependencies] black = "^23.1.0" @@ -70,16 +71,6 @@ fastapi = "^0.110.1" [tool.ruff] line-length = 75 -[tool.ruff.lint] -select = ["E", "F", "W", "I", "UP"] -ignore = [] -fixable = ["ALL"] -unfixable = [] -preview = true - -[tool.ruff.lint.per-file-ignores] -"swarms/prompts/**.py" = ["E501"] - [tool.black] target-version = ["py38"] line-length = 75 diff --git a/requirements.txt b/requirements.txt index 9b7843ae..2fe50821 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,4 @@ Pillow==10.2.0 termcolor==2.2.0 psutil sentry-sdk +docstring_parser==0.16 \ No newline at end of file diff --git a/swarms/models/__init__.py b/swarms/models/__init__.py index 4637d332..40e72831 100644 --- a/swarms/models/__init__.py +++ b/swarms/models/__init__.py @@ -29,9 +29,6 @@ from swarms.models.popular_llms import ( from swarms.models.popular_llms import ( OpenAILLM as OpenAI, ) -from swarms.models.popular_llms import ( - ReplicateLLM as Replicate, -) from swarms.models.popular_llms import OctoAIChat from swarms.models.qwen import QwenVLMultiModal # noqa: E402 diff --git a/swarms/models/popular_llms.py b/swarms/models/popular_llms.py index 2f043445..02b20234 100644 --- a/swarms/models/popular_llms.py +++ b/swarms/models/popular_llms.py @@ -4,15 +4,12 @@ from langchain_community.chat_models.azure_openai import ( from langchain_community.chat_models.openai import ( ChatOpenAI as OpenAIChat, ) -from langchain_community.llms import ( - Anthropic, - Cohere, - MosaicML, - OpenAI, - Replicate, -) +from langchain.llms.anthropic import Anthropic +from langchain.llms.cohere import Cohere +from langchain.llms.mosaicml import MosaicML +from langchain.llms.openai import OpenAI #, OpenAIChat, AzureOpenAI from langchain_community.llms.octoai_endpoint import OctoAIEndpoint - +from langchain.llms.replicate import Replicate class AnthropicChat(Anthropic): def __call__(self, *args, **kwargs): @@ -34,7 +31,7 @@ class OpenAILLM(OpenAI): return self.invoke(*args, **kwargs) -class ReplicateLLM(Replicate): +class ReplicateChat(Replicate): def __call__(self, *args, **kwargs): return self.invoke(*args, **kwargs) @@ -45,6 +42,9 @@ class AzureOpenAILLM(AzureChatOpenAI): class OpenAIChatLLM(OpenAIChat): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + def __call__(self, *args, **kwargs): return self.invoke(*args, **kwargs) diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index d66655b2..30657f02 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -5,7 +5,6 @@ from swarms.structs.agent_process import ( AgentProcessQueue, ) from swarms.structs.auto_swarm import AutoSwarm, AutoSwarmRouter -from swarms.structs.autoscaler import AutoScaler from swarms.structs.base_structure import BaseStructure from swarms.structs.base_swarm import BaseSwarm from swarms.structs.base_workflow import BaseWorkflow @@ -87,7 +86,6 @@ from swarms.structs.yaml_model import ( YamlModel, ) - __all__ = [ "Agent", "AgentJob", @@ -95,7 +93,6 @@ __all__ = [ "AgentProcessQueue", "AutoSwarm", "AutoSwarmRouter", - "AutoScaler", "BaseStructure", "BaseSwarm", "BaseWorkflow", diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 2b079c9d..a758db6a 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1,6 +1,7 @@ import asyncio import json import logging +from typing import Union import os import random import sys @@ -26,6 +27,12 @@ from swarms.utils.pdf_to_text import pdf_to_text from swarms.tools.exec_tool import execute_tool_by_name from swarms.tools.code_executor import CodeExecutor from swarms.prompts.worker_prompt import tool_usage_worker_prompt +from pydantic import BaseModel +from swarms.tools.pydantic_to_json import ( + pydantic_to_functions, + multi_pydantic_to_functions, +) +from swarms.structs.schemas import Step, ManySteps # Utils @@ -63,13 +70,17 @@ def step_id(): return str(uuid.uuid1()) +# Agent output types +agent_output_type = Union[BaseModel, dict, str] +ToolUsageType = Union[BaseModel, Dict[str, Any]] + + +# [FEAT][AGENT] class Agent: """ Agent is the backbone to connect LLMs with tools and long term memory. Agent also provides the ability to ingest any type of docs like PDFs, Txts, Markdown, Json, and etc for the agent. Here is a list of features. - - Args: llm (Any): The language model to use template (str): The template to use @@ -155,22 +166,22 @@ class Agent: def __init__( self, - id: str = agent_id, - llm: Any = None, + id: Optional[str] = agent_id, + llm: Optional[Any] = None, template: Optional[str] = None, max_loops: Optional[int] = 1, stopping_condition: Optional[Callable[[str], bool]] = None, - loop_interval: int = 0, - retry_attempts: int = 3, - retry_interval: int = 1, - return_history: bool = False, - stopping_token: str = None, + loop_interval: Optional[int] = 0, + retry_attempts: Optional[int] = 3, + retry_interval: Optional[int] = 1, + return_history: Optional[bool] = False, + stopping_token: Optional[str] = None, dynamic_loops: Optional[bool] = False, - interactive: bool = False, - dashboard: bool = False, - agent_name: str = "swarm-worker-01", - agent_description: str = None, - system_prompt: str = AGENT_SYSTEM_PROMPT_3, + interactive: Optional[bool] = False, + dashboard: Optional[bool] = False, + agent_name: Optional[str] = "swarm-worker-01", + agent_description: Optional[str] = None, + system_prompt: Optional[str] = AGENT_SYSTEM_PROMPT_3, tools: List[BaseTool] = [], dynamic_temperature_enabled: Optional[bool] = False, sop: Optional[str] = None, @@ -178,7 +189,7 @@ class Agent: saved_state_path: Optional[str] = None, autosave: Optional[bool] = False, context_length: Optional[int] = 8192, - user_name: str = "Human:", + user_name: Optional[str] = "Human:", self_healing_enabled: Optional[bool] = False, code_interpreter: Optional[bool] = False, multi_modal: Optional[bool] = None, @@ -187,22 +198,22 @@ class Agent: tokenizer: Optional[Any] = None, long_term_memory: Optional[AbstractVectorDatabase] = None, preset_stopping_token: Optional[bool] = False, - traceback: Any = None, - traceback_handlers: Any = None, + traceback: Optional[Any] = None, + traceback_handlers: Optional[Any] = None, streaming_on: Optional[bool] = False, docs: List[str] = None, - docs_folder: str = None, - verbose: bool = False, + docs_folder: Optional[str] = None, + verbose: Optional[bool] = False, parser: Optional[Callable] = None, best_of_n: Optional[int] = None, callback: Optional[Callable] = None, metadata: Optional[Dict[str, Any]] = None, callbacks: Optional[List[Callable]] = None, - logger_handler: Any = sys.stderr, + logger_handler: Optional[Any] = sys.stderr, search_algorithm: Optional[Callable] = None, logs_to_filename: Optional[str] = None, evaluator: Optional[Callable] = None, - output_json: bool = False, + output_json: Optional[bool] = False, stopping_func: Optional[Callable] = None, custom_loop_condition: Optional[Callable] = None, sentiment_threshold: Optional[float] = None, @@ -210,6 +221,13 @@ class Agent: sentiment_analyzer: Optional[Callable] = None, limit_tokens_from_string: Optional[Callable] = None, custom_tools_prompt: Optional[Callable] = None, + tool_schema: ToolUsageType = None, + output_type: agent_output_type = None, + function_calling_type: str = "json", + output_cleaner: Optional[Callable] = None, + function_calling_format_type: Optional[str] = "OpenAI", + list_tool_schemas: Optional[List[BaseModel]] = None, + metadata_output_type: str = "json", *args, **kwargs, ): @@ -270,6 +288,13 @@ class Agent: self.custom_exit_command = custom_exit_command self.sentiment_analyzer = sentiment_analyzer self.limit_tokens_from_string = limit_tokens_from_string + self.tool_schema = tool_schema + self.output_type = output_type + self.function_calling_type = function_calling_type + self.output_cleaner = output_cleaner + self.function_calling_format_type = function_calling_format_type + self.list_tool_schemas = list_tool_schemas + self.metadata_output_type = metadata_output_type # The max_loops will be set dynamically if the dynamic_loop if self.dynamic_loops: @@ -293,11 +318,15 @@ class Agent: ) # If the preset stopping token is enabled then set the stopping token to the preset stopping token - if preset_stopping_token: + if preset_stopping_token is not None: self.stopping_token = "" + # If the stopping function is provided then set the stopping condition to the stopping function self.short_memory = Conversation( - system_prompt=self.system_prompt, time_enabled=True + system_prompt=system_prompt, + time_enabled=True, + *args, + **kwargs ) # If the docs exist then ingest the docs @@ -359,6 +388,30 @@ class Agent: # logger.info("Creating Agent {}".format(self.agent_name)) + # If the tool types + if self.tool_schema is not None: + logger.info("Tool schema provided") + tool_schema_str = self.tool_schema_to_str(self.tool_schema) + + print(tool_schema_str) + + # Add to the short memory + logger.info(f"Adding tool schema to memory: {tool_schema_str}") + self.short_memory.add( + role=self.user_name, content=tool_schema_str + ) + + # If a list of tool schemas: + if self.list_tool_schemas is not None: + logger.info("Tool schema provided") + tool_schema_str = self.tool_schemas_to_str(list_tool_schemas) + + # Add to the short memory + logger.info(f"Adding tool schema to memory: {tool_schema_str}") + self.short_memory.add( + role=self.user_name, content=tool_schema_str + ) + def set_system_prompt(self, system_prompt: str): """Set the system prompt""" self.system_prompt = system_prompt @@ -522,6 +575,78 @@ class Agent: for chunk in content: print(chunk, end="") + ########################## FUNCTION CALLING ########################## + + def json_str_to_json(self, json_str: str): + """Convert a JSON string to a JSON object""" + return json.loads(json_str) + + def json_str_to_pydantic_model(self, json_str: str, model: BaseModel): + """Convert a JSON string to a Pydantic model""" + return model.model_validate_json(json_str) + + def json_str_to_dict(self, json_str: str): + """Convert a JSON string to a dictionary""" + return json.loads(json_str) + + def pydantic_model_to_json_str(self, model: BaseModel): + return str(pydantic_to_functions(model)) + + def dict_to_json_str(self, dictionary: dict): + """Convert a dictionary to a JSON string""" + return json.dumps(dictionary) + + def dict_to_pydantic_model(self, dictionary: dict, model: BaseModel): + """Convert a dictionary to a Pydantic model""" + return model.model_validate_json(dictionary) + + # def prep_pydantic_model_for_str(self, model: BaseModel): + # # Convert to Function + # out = self.pydantic_model_to_json_str(model) + + # # return function_to_str(out) + + def tool_schema_to_str( + self, tool_schema: BaseModel = None, *args, **kwargs + ): + """Convert a tool schema to a string""" + out = pydantic_to_functions(tool_schema) + return str(out) + + def tool_schemas_to_str( + self, tool_schemas: List[BaseModel] = None, *args, **kwargs + ): + """Convert a list of tool schemas to a string""" + out = multi_pydantic_to_functions(tool_schemas) + return str(out) + + def str_to_pydantic_model(self, string: str, model: BaseModel): + """Convert a string to a Pydantic model""" + return model.model_validate_json(string) + + def list_str_to_pydantic_model( + self, list_str: List[str], model: BaseModel + ): + """Convert a list of strings to a Pydantic model""" + # return model.model_validate_json(list_str) + for string in list_str: + return model.model_validate_json(string) + + def prepare_output_for_output_model( + self, output: agent_output_type = None + ): + """Prepare the output for the output model""" + if self.output_type == BaseModel: + return self.str_to_pydantic_model(output, self.output_type) + elif self.output_type == dict: + return self.dict_to_json_str(output) + elif self.output_type == str: + return output + else: + return output + + ########################## FUNCTION CALLING ########################## + def _history(self, user_name: str, task: str) -> str: """Generate the history for the history prompt @@ -567,6 +692,7 @@ class Agent: loop_count = 0 response = None + step_pool = [] while ( self.max_loops == "auto" @@ -722,10 +848,37 @@ class Agent: ) time.sleep(self.loop_interval) + # Save Step Metadata + active_step = Step( + task_id=task_id(), + step_id=loop_count, + name=task, + output=response, + max_loops=self.max_loops, + ) + + step_pool.append(active_step) + + # Save the step pool + # self.step_cache = step_pool + if self.autosave: logger.info("Autosaving agent state.") self.save_state(self.saved_state_path) + # Apply the cleaner function to the response + if self.output_cleaner is not None: + response = self.output_cleaner(response) + + # Prepare the output for the output model + if self.output_type is not None: + response = self.prepare_output_for_output_model(response) + + # List of steps for this task + ManySteps(task_id=task_id(), steps=step_pool) + + # Save Many steps + return response except Exception as error: print(f"Error running agent: {error}") @@ -790,7 +943,7 @@ class Agent: Returns: str: The agent history prompt """ - ltr = str(self.long_term_memory.query(query), *args, **kwargs) + ltr = self.long_term_memory.query(query, *args, **kwargs) context = f""" System: This reminds you of these events from your past: [{ltr}] @@ -1195,7 +1348,7 @@ class Agent: def reset(self): """Reset the agent""" - self.short_memory = {} + self.short_memory = None def run_code(self, code: str): """ diff --git a/swarms/structs/agent_process.py b/swarms/structs/agent_process.py index d1931027..711f152a 100644 --- a/swarms/structs/agent_process.py +++ b/swarms/structs/agent_process.py @@ -2,7 +2,7 @@ from datetime import datetime from pydantic import BaseModel -from swarms.structs.omni_agent_types import agents +from swarms.structs.omni_agent_types import AgentListType from swarms.utils.loguru_logger import logger from typing import Callable @@ -54,7 +54,7 @@ class AgentProcessQueue: [] ) # Currently use list to simulate queue - def add(self, agents: agents): + def add(self, agents: AgentListType): """ Adds an agent process to the queue. diff --git a/swarms/structs/auto_swarm.py b/swarms/structs/auto_swarm.py index 717829dc..c8f05b08 100644 --- a/swarms/structs/auto_swarm.py +++ b/swarms/structs/auto_swarm.py @@ -1,71 +1,9 @@ from typing import Any, Callable, Dict, Optional, Sequence -from swarms.models.base_llm import AbstractLLM from swarms.structs.base_swarm import BaseSwarm from swarms.utils.loguru_logger import logger -class SequentialAccountingSwarm(BaseSwarm): - """SequentialAccountingSwarm class represents a swarm of agents that can be created automatically. - - Flow: - name -> router -> swarm entry point - - Attributes: - name (str, optional): The name of the swarm. Defaults to "kyegomez/sequential-accounting-swarm". - description (str, optional): The description of the swarm. Defaults to None. - verbose (bool): Whether to enable verbose mode or not. Defaults to False. - custom_params (dict, optional): Custom parameters for the swarm. Defaults to None. - iters (int, optional): The number of iterations for the swarm simulation. Defaults to 100. - max_agents (int, optional): The maximum number of agents in the swarm. Defaults to 100. - agents (Sequence[AbstractLLM], optional): The list of agents in the swarm. Defaults to None. - - Methods: - run(task: str = None, *args, **kwargs) -> Any: - Run the swarm simulation. - - """ - - def __init__( - self, - name: Optional[str] = "kyegomez/sequential-accounting-swarm", - description: Optional[str] = None, - verbose: bool = False, - custom_params: Optional[Dict[str, Any]] = None, - iters: Optional[int] = 100, - max_agents: Optional[int] = 100, - agents: Sequence[AbstractLLM] = None, - *args, - **kwargs, - ): - super().__init__() - self.name = name - self.description = description - self.verbose = verbose - self.custom_params = custom_params - self.iters = iters - self.max_agents = max_agents - self.agents = agents - - def run(self, task: str = None, *args, **kwargs): - """Run the swarm simulation. - - Args: - task (str, optional): The task to be performed by the agents. Defaults to None. - *args: Variable length argument list. - **kwargs: Arbitrary keyword arguments. - - Returns: - Any: The final result of the swarm simulation. - - """ - for agent in self.agents: - out = agent.run(task, *args, **kwargs) - final = agent.run(out) - - return final - - class AutoSwarmRouter(BaseSwarm): """AutoSwarmRouter class represents a router for the AutoSwarm class. @@ -175,7 +113,6 @@ class AutoSwarm(BaseSwarm): description: Optional[str] = None, verbose: bool = False, custom_params: Optional[Dict[str, Any]] = None, - router: Optional[AutoSwarmRouter] = None, custom_preprocess: Optional[Callable] = None, custom_postprocess: Optional[Callable] = None, custom_router: Optional[Callable] = None, @@ -187,7 +124,6 @@ class AutoSwarm(BaseSwarm): self.description = description self.verbose = verbose self.custom_params = custom_params - self.router = router self.custom_preprocess = custom_preprocess self.custom_postprocess = custom_postprocess self.router = AutoSwarmRouter( diff --git a/swarms/structs/autoscaler.py b/swarms/structs/autoscaler.py deleted file mode 100644 index c3a384e3..00000000 --- a/swarms/structs/autoscaler.py +++ /dev/null @@ -1,329 +0,0 @@ -import logging -import queue -import threading -from time import sleep -from typing import Callable, Dict, List, Optional - -from termcolor import colored - -from swarms.structs.agent import Agent -from swarms.structs.base_structure import BaseStructure -from swarms.utils.decorators import ( - error_decorator, - log_decorator, - timing_decorator, -) - - -class AutoScaler(BaseStructure): - """ - AutoScaler class - - The AutoScaler class is responsible for managing the agents pool - and the task queue. It also monitors the health of the agents and - scales the pool up or down based on the number of pending tasks - and the current load of the agents. - - Args: - initial_agents (Optional[int], optional): Initial number of - agents to start with. Defaults to 10. - scale_up_factor (int, optional): Factor by which to scale up - the agents pool. Defaults to 1. - idle_threshold (float, optional): Threshold for scaling down - the agents pool. Defaults to 0.2. - busy_threshold (float, optional): Threshold for scaling up - the agents pool. Defaults to 0.7. - agents (List[Agent], optional): List of agents to use in the - pool. Defaults to None. - autoscale (Optional[bool], optional): Whether to autoscale - the agents pool. Defaults to True. - min_agents (Optional[int], optional): Minimum number of - agents to keep in the pool. Defaults to 10. - max_agents (Optional[int], optional): Maximum number of - agents to keep in the pool. Defaults to 100. - custom_scale_strategy (Optional[Callable], optional): Custom - scaling strategy to use. Defaults to None. - - Methods: - add_task: Add tasks to queue - scale_up: Add more agents - scale_down: scale down - run: Run agent the task on the agent id - monitor_and_scale: Monitor and scale - start: Start scaling - check_agent_health: Checks the health of each agent and - replaces unhealthy agents. - balance_load: Distributes tasks among agents based on their - current load. - set_scaling_strategy: Set a custom scaling strategy. - execute_scaling_strategy: Execute the custom scaling strategy - if defined. - report_agent_metrics: Collects and reports metrics from each - agent. - report: Reports the current state of the autoscaler. - print_dashboard: Prints a dashboard of the current state of - the autoscaler. - - Examples: - >>> import os - >>> from dotenv import load_dotenv - >>> # Import the OpenAIChat model and the Agent struct - >>> from swarms.models import OpenAIChat - >>> from swarms.structs import Agent - >>> from swarms.structs.autoscaler import AutoScaler - >>> # Load the environment variables - >>> load_dotenv() - >>> # Get the API key from the environment - >>> api_key = os.environ.get("OPENAI_API_KEY") - >>> # Initialize the language model - >>> llm = OpenAIChat( - ... temperature=0.5, - ... openai_api_key=api_key, - ... ) - >>> ## Initialize the workflow - >>> agent = Agent(llm=llm, max_loops=1, dashboard=True) - >>> # Load the autoscaler - >>> autoscaler = AutoScaler( - ... initial_agents=2, - ... scale_up_factor=1, - ... idle_threshold=0.2, - ... busy_threshold=0.7, - ... agents=[agent], - ... autoscale=True, - ... min_agents=1, - ... max_agents=5, - ... custom_scale_strategy=None, - ... ) - >>> print(autoscaler) - >>> # Run the workflow on a task - >>> out = autoscaler.run(agent.id, "Generate a 10,000 word blog on health and wellness.") - >>> print(out) - - """ - - @log_decorator - @error_decorator - @timing_decorator - def __init__( - self, - initial_agents: Optional[int] = 10, - scale_up_factor: int = 1, - idle_threshold: float = 0.2, - busy_threshold: float = 0.7, - agents: List[Agent] = None, - autoscale: Optional[bool] = True, - min_agents: Optional[int] = 10, - max_agents: Optional[int] = 100, - custom_scale_strategy: Optional[Callable] = None, - *args, - **kwargs, - ): - self.agents_pool = agents or [ - agents[0]() for _ in range(initial_agents) - ] - self.task_queue = queue.Queue() - self.scale_up_factor = scale_up_factor - self.idle_threshold = idle_threshold - self.busy_threshold = busy_threshold - self.lock = threading.Lock() - self.agents = agents - self.autoscale = autoscale - self.min_agents = min_agents - self.max_agents = max_agents - self.custom_scale_strategy = custom_scale_strategy - - def add_task(self, task): - """Add tasks to queue""" - try: - self.task_queue.put(task) - except Exception as error: - print( - f"Error adding task to queue: {error} try again with" - " a new task" - ) - - @log_decorator - @error_decorator - @timing_decorator - def scale_up(self): - """Add more agents""" - try: - with self.lock: - new_agents_counts = ( - len(self.agents_pool) * self.scale_up_factor - ) - for _ in range(new_agents_counts): - self.agents_pool.append(self.agents[0]()) - except Exception as error: - print(f"Error scaling up: {error} try again with a new task") - - def scale_down(self): - """scale down""" - try: - with self.lock: - if ( - len(self.agents_pool) > 10 - ): # ensure minmum of 10 agents - del self.agents_pool[-1] # remove last agent - except Exception as error: - print( - f"Error scaling down: {error} try again with a new" " task" - ) - - def run(self, agent_id, task: Optional[str] = None, *args, **kwargs): - """Run agent the task on the agent id - - Args: - agent_id (_type_): _description_ - task (str, optional): _description_. Defaults to None. - - Raises: - ValueError: _description_ - - Returns: - _type_: _description_ - """ - for agent in self.agents_pool: - if agent.id == agent_id: - return agent.run(task, *args, **kwargs) - raise ValueError(f"No agent found with ID {agent_id}") - - @log_decorator - @error_decorator - @timing_decorator - def monitor_and_scale(self): - """Monitor and scale""" - try: - while True: - sleep(60) # check minute - pending_tasks = self.task_queue.qsize() - active_agents = sum( - [1 for agent in self.agents_pool if agent.is_busy()] - ) - - if ( - pending_tasks / len(self.agents_pool) - > self.busy_threshold - ): - self.scale_up() - elif ( - active_agents / len(self.agents_pool) - < self.idle_threshold - ): - self.scale_down() - except Exception as error: - print( - f"Error monitoring and scaling: {error} try again" - " with a new task" - ) - - @log_decorator - @error_decorator - @timing_decorator - def start(self): - """Start scaling""" - try: - monitor_thread = threading.Thread( - target=self.monitor_and_scale - ) - monitor_thread.start() - - while True: - task = self.task_queue.get() - if task: - available_agent = next( - agent for agent in self.agents_pool - ) - if available_agent: - available_agent.run(task) - except Exception as error: - print(f"Error starting: {error} try again with a new task") - - def check_agent_health(self): - """Checks the health of each agent and replaces unhealthy agents.""" - for i, agent in enumerate(self.agents_pool): - if not agent.is_healthy(): - logging.warning(f"Replacing unhealthy agent at index {i}") - self.agents_pool[i] = self.agent() - - def balance_load(self): - """Distributes tasks among agents based on their current load.""" - try: - while not self.task_queue.empty(): - for agent in self.agents_pool: - if agent.can_accept_task(): - task = self.task_queue.get() - agent.run(task) - except Exception as error: - print( - f"Error balancing load: {error} try again with a new" - " task" - ) - - def set_scaling_strategy(self, strategy: Callable[[int, int], int]): - """Set a custom scaling strategy.""" - self.custom_scale_strategy = strategy - - def execute_scaling_strategy(self): - """Execute the custom scaling strategy if defined.""" - try: - if hasattr(self, "custom_scale_strategy"): - scale_amount = self.custom_scale_strategy( - self.task_queue.qsize(), len(self.agents_pool) - ) - if scale_amount > 0: - for _ in range(scale_amount): - self.agents_pool.append(self.agent()) - elif scale_amount < 0: - for _ in range(abs(scale_amount)): - if len(self.agents_pool) > 10: - del self.agents_pool[-1] - except Exception as error: - print( - f"Error executing scaling strategy: {error} try again" - " with a new task" - ) - - def report_agent_metrics(self) -> Dict[str, List[float]]: - """Collects and reports metrics from each agent.""" - metrics = { - "completion_time": [], - "success_rate": [], - "error_rate": [], - } - for agent in self.agents_pool: - agent_metrics = agent.get_metrics() - for key in metrics.keys(): - metrics[key].append(agent_metrics.get(key, 0.0)) - return metrics - - def report(self): - """Reports the current state of the autoscaler.""" - self.check_agent_health() - self.balance_load() - self.execute_scaling_strategy() - metrics = self.report_agent_metrics() - print(metrics) - - def print_dashboard(self): - """Prints a dashboard of the current state of the autoscaler.""" - print( - colored( - f""" - - Autoscaler Dashboard - -------------------- - Agents: {len(self.agents_pool)} - Initial Agents: {self.initial_agents} - self.scale_up_factor: {self.scale_up_factor} - self.idle_threshold: {self.idle_threshold} - self.busy_threshold: {self.busy_threshold} - self.task_queue.qsize(): {self.task_queue.qsize()} - self.task_queue.empty(): {self.task_queue.empty()} - self.task_queue.full(): {self.task_queue.full()} - self.task_queue.maxsize: {self.task_queue.maxsize} - - """, - "cyan", - ) - ) diff --git a/swarms/structs/base_structure.py b/swarms/structs/base_structure.py index 5cc7f57e..7f67c4c6 100644 --- a/swarms/structs/base_structure.py +++ b/swarms/structs/base_structure.py @@ -12,10 +12,10 @@ try: import gzip except ImportError as error: print(f"Error importing gzip: {error}") -from pydantic import BaseModel +# from pydantic import BaseModel -class BaseStructure(BaseModel): +class BaseStructure: """Base structure. @@ -64,12 +64,23 @@ class BaseStructure(BaseModel): BaseStructure(name=None, description=None, save_metadata=True, save_artifact_path='./artifacts', save_metadata_path='./metadata', save_error_path='./errors') """ - name: Optional[str] = None - description: Optional[str] = None - save_metadata: bool = True - save_artifact_path: Optional[str] = "./artifacts" - save_metadata_path: Optional[str] = "./metadata" - save_error_path: Optional[str] = "./errors" + def __init__( + self, + name: Optional[str] = None, + description: Optional[str] = None, + save_metadata: bool = True, + save_artifact_path: Optional[str] = "./artifacts", + save_metadata_path: Optional[str] = "./metadata", + save_error_path: Optional[str] = "./errors", + ): + super().__init__() + self.name = name + self.description = description + self.save_metadata = save_metadata + self.save_artifact_path = save_artifact_path + self.save_metadata_path = save_metadata_path + self.save_error_path = save_error_path + def run(self, *args, **kwargs): """Run the structure.""" diff --git a/swarms/structs/base_swarm.py b/swarms/structs/base_swarm.py index 4a39488e..72ba9463 100644 --- a/swarms/structs/base_swarm.py +++ b/swarms/structs/base_swarm.py @@ -16,7 +16,7 @@ import yaml from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation from swarms.utils.loguru_logger import logger -from swarms.structs.omni_agent_types import agent +from swarms.structs.omni_agent_types import AgentType class BaseSwarm(ABC): @@ -68,15 +68,18 @@ class BaseSwarm(ABC): def __init__( self, - # name: str = "Swarm", - agents: List[Agent] = None, - models: List[Any] = None, - max_loops: int = 200, + name: Optional[str] = None, + description: Optional[str] = None, + agents: Optional[List[Agent]] = None, + models: Optional[List[Any]] = None, + max_loops: Optional[int] = 200, callbacks: Optional[Sequence[callable]] = None, - autosave: bool = False, - logging: bool = False, - return_metadata: bool = False, - metadata_filename: str = "multiagent_structure_metadata.json", + autosave: Optional[bool] = False, + logging: Optional[bool] = False, + return_metadata: Optional[bool] = False, + metadata_filename: Optional[ + str + ] = "multiagent_structure_metadata.json", stopping_function: Optional[Callable] = None, stopping_condition: Optional[str] = "stop", stopping_condition_args: Optional[Dict] = None, @@ -84,6 +87,8 @@ class BaseSwarm(ABC): **kwargs, ): """Initialize the swarm with agents""" + self.name = name + self.description = description self.agents = agents self.models = models self.max_loops = max_loops @@ -96,31 +101,11 @@ class BaseSwarm(ABC): self.stopping_condition = stopping_condition self.stopping_condition_args = stopping_condition_args + # Initialize conversation self.conversation = Conversation( time_enabled=True, *args, **kwargs ) - # Handle the case where the agents are not provided - # Handle agents - for agent in self.agents: - if not isinstance(agent, Agent): - raise TypeError("Agents must be of type Agent.") - - if self.agents is None: - self.agents = [] - - # Handle the case where the callbacks are not provided - if self.callbacks is None: - self.callbacks = [] - - # Handle the case where the autosave is not provided - if self.autosave is None: - self.autosave = False - - # Handle the case where the logging is not provided - if self.logging is None: - self.logging = False - # Handle callbacks if callbacks is not None: for callback in self.callbacks: @@ -154,11 +139,9 @@ class BaseSwarm(ABC): self.stopping_condition_args = stopping_condition_args self.stopping_condition = stopping_condition - # @abstractmethod def communicate(self): """Communicate with the swarm through the orchestrator, protocols, and the universal communication layer""" - # @abstractmethod def run(self): """Run the swarm""" ... @@ -186,12 +169,11 @@ class BaseSwarm(ABC): def step(self): """Step the swarm""" - # @abstractmethod - def add_agent(self, agent: agent): + def add_agent(self, agent: AgentType): """Add a agent to the swarm""" self.agents.append(agent) - def add_agents(self, agents: List[agent]): + def add_agents(self, agents: List[AgentType]): """Add a list of agents to the swarm""" self.agents.extend(agents) @@ -200,8 +182,7 @@ class BaseSwarm(ABC): agent = self.get_agent_by_id(agent_id) self.add_agent(agent) - # @abstractmethod - def remove_agent(self, agent: agent): + def remove_agent(self, agent: AgentType): """Remove a agent from the swarm""" self.agents.remove(agent) @@ -211,88 +192,70 @@ class BaseSwarm(ABC): if agent.name == name: return agent - # @abstractmethod - def broadcast(self, message: str, sender: Optional[agent] = None): + def reset_all_agents(self): + """Resets the state of all agents.""" + for agent in self.agents: + agent.reset() + + def broadcast(self, message: str, sender: Optional[AgentType] = None): """Broadcast a message to all agents""" - # @abstractmethod def reset(self): """Reset the swarm""" - # @abstractmethod def plan(self, task: str): """agents must individually plan using a workflow or pipeline""" - # @abstractmethod def direct_message( self, message: str, - sender: agent, - recipient: agent, + sender: AgentType, + recipient: AgentType, ): """Send a direct message to a agent""" - # @abstractmethod - def autoscaler(self, num_agents: int, agent: [agent]): + def autoscaler(self, num_agents: int, agent: List[AgentType]): """Autoscaler that acts like kubernetes for autonomous agents""" - # @abstractmethod - def get_agent_by_id(self, id: str) -> agent: + def get_agent_by_id(self, id: str) -> AgentType: """Locate a agent by id""" - # @abstractmethod - def get_agent_by_name(self, name: str) -> agent: - """Locate a agent by name""" - - # @abstractmethod - def assign_task(self, agent: agent, task: Any) -> Dict: + def assign_task(self, agent: AgentType, task: Any) -> Dict: """Assign a task to a agent""" - # @abstractmethod - def get_all_tasks(self, agent: agent, task: Any): + def get_all_tasks(self, agent: AgentType, task: Any): """Get all tasks""" - # @abstractmethod def get_finished_tasks(self) -> List[Dict]: """Get all finished tasks""" - # @abstractmethod def get_pending_tasks(self) -> List[Dict]: """Get all pending tasks""" - # @abstractmethod - def pause_agent(self, agent: agent, agent_id: str): + def pause_agent(self, agent: AgentType, agent_id: str): """Pause a agent""" - # @abstractmethod - def resume_agent(self, agent: agent, agent_id: str): + def resume_agent(self, agent: AgentType, agent_id: str): """Resume a agent""" - # @abstractmethod - def stop_agent(self, agent: agent, agent_id: str): + def stop_agent(self, agent: AgentType, agent_id: str): """Stop a agent""" - # @abstractmethod - def restart_agent(self, agent: agent): + def restart_agent(self, agent: AgentType): """Restart agent""" - # @abstractmethod def scale_up(self, num_agent: int): """Scale up the number of agents""" - # @abstractmethod def scale_down(self, num_agent: int): """Scale down the number of agents""" - # @abstractmethod def scale_to(self, num_agent: int): """Scale to a specific number of agents""" - # @abstractmethod - def get_all_agents(self) -> List[agent]: + def get_all_agents(self) -> List[AgentType]: """Get all agents""" - # @abstractmethod def get_swarm_size(self) -> int: """Get the size of the swarm""" @@ -506,7 +469,6 @@ class BaseSwarm(ABC): ) return list(responses) - # @abstractmethod def add_swarm_entry(self, swarm): """ Add the information of a joined Swarm to the registry. diff --git a/swarms/structs/blockdevice.py b/swarms/structs/blockdevice.py deleted file mode 100644 index 96b8c8e6..00000000 --- a/swarms/structs/blockdevice.py +++ /dev/null @@ -1,25 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class BlockDevice: - """ - Represents a block device. - - Attributes: - device (str): The device name. - cluster (str): The cluster name. - description (str): A description of the block device. - """ - - def __init__(self, device: str, cluster: str, description: str): - self.device = device - self.cluster = cluster - self.description = description - - def __str__(self): - return ( - f"BlockDevice(device={self.device}," - f" cluster={self.cluster}," - f" description={self.description})" - ) diff --git a/swarms/structs/blocksdict.py b/swarms/structs/blocks_dict.py similarity index 100% rename from swarms/structs/blocksdict.py rename to swarms/structs/blocks_dict.py diff --git a/swarms/structs/blockslist.py b/swarms/structs/blocks_list.py similarity index 100% rename from swarms/structs/blockslist.py rename to swarms/structs/blocks_list.py diff --git a/swarms/structs/conversation.py b/swarms/structs/conversation.py index 2a63ac13..ddbc0c5a 100644 --- a/swarms/structs/conversation.py +++ b/swarms/structs/conversation.py @@ -83,11 +83,11 @@ class Conversation(BaseStructure): self.context_length = context_length # If system prompt is not None, add it to the conversation history - if self.system_prompt: - self.add("system", self.system_prompt) + if self.system_prompt is not None: + self.add("System: ", self.system_prompt) # If tokenizer then truncate - if tokenizer: + if tokenizer is not None: self.truncate_memory_with_tokenizer() def add(self, role: str, content: str, *args, **kwargs): diff --git a/swarms/structs/groupchat.py b/swarms/structs/groupchat.py index fb2adc07..9db97604 100644 --- a/swarms/structs/groupchat.py +++ b/swarms/structs/groupchat.py @@ -2,6 +2,7 @@ import logging from dataclasses import dataclass from typing import Dict, List + from swarms.structs.agent import Agent logger = logging.getLogger(__name__) @@ -126,6 +127,7 @@ class GroupChat: return "\n".join(formatted_messages) +@dataclass class GroupChatManager: """ GroupChatManager @@ -142,9 +144,8 @@ class GroupChatManager: """ - def __init__(self, groupchat: GroupChat, selector: Agent): - self.groupchat = groupchat - self.selector = selector + groupchat: GroupChat + selector: Agent def __call__(self, task: str): """Call 'GroupChatManager' instance as a function. diff --git a/swarms/structs/load_balancer.py b/swarms/structs/load_balancer.py deleted file mode 100644 index e1d02dcb..00000000 --- a/swarms/structs/load_balancer.py +++ /dev/null @@ -1,70 +0,0 @@ -import multiprocessing as mp -from typing import List, Optional - -from swarms.structs.base_structure import BaseStructure - - -class LoadBalancer(BaseStructure): - """ - A load balancer class that distributes tasks among multiple workers. - - Args: - num_workers (int): The number of worker processes to create. - agents (Optional[List]): A list of agents to assign to the load balancer. - *args: Variable length argument list. - **kwargs: Arbitrary keyword arguments. - - Attributes: - num_workers (int): The number of worker processes. - agents (Optional[List]): A list of agents assigned to the load balancer. - tasks (List): A list of tasks to be executed. - results (List): A list of results from the executed tasks. - workers (List): A list of worker processes. - - Methods: - add_task: Add a task to the load balancer. - - """ - - def __init__( - self, - num_workers: int = 1, - agents: Optional[List] = None, - *args, - **kwargs, - ): - super().__init__(*args, **kwargs) - self.num_workers = num_workers - self.agents = agents - self.tasks = [] - self.results = [] - self.workers = [] - self._init_workers() - - def _init_workers(self): - for i in range(self.num_workers): - worker = mp.Process(target=self._worker) - worker.start() - self.workers.append(worker) - - def _worker(self): - while True: - task = self._get_task() - if task is None: - break - result = self._run_task(task) - self._add_result(result) - - def _get_task(self): - if len(self.tasks) == 0: - return None - return self.tasks.pop(0) - - def _run_task(self, task): - return task() - - def _add_result(self, result): - self.results.append(result) - - def add_task(self, task): - self.tasks.append(task) diff --git a/swarms/structs/message.py b/swarms/structs/message.py index 20a90fe5..de9d13d6 100644 --- a/swarms/structs/message.py +++ b/swarms/structs/message.py @@ -1,4 +1,5 @@ import datetime +from typing import Dict, Optional class Message: @@ -15,13 +16,18 @@ class Message: print(mes) """ - def __init__(self, sender, content, metadata=None): - self.timestamp = datetime.datetime.now() - self.sender = sender - self.content = content - self.metadata = metadata or {} + def __init__( + self, + sender: str, + content: str, + metadata: Optional[Dict[str, str]] = None, + ): + self.timestamp: datetime.datetime = datetime.datetime.now() + self.sender: str = sender + self.content: str = content + self.metadata: Dict[str, str] = metadata or {} - def __repr__(self): + def __repr__(self) -> str: """ __repr__ means... """ diff --git a/swarms/structs/multi_agent_collab.py b/swarms/structs/multi_agent_collab.py index 5010378b..2caf4b8e 100644 --- a/swarms/structs/multi_agent_collab.py +++ b/swarms/structs/multi_agent_collab.py @@ -7,6 +7,7 @@ from langchain.output_parsers import RegexParser from swarms.structs.agent import Agent from swarms.utils.logger import logger +from swarms.structs.base_swarm import BaseSwarm # utils @@ -24,7 +25,7 @@ bid_parser = BidOutputParser( # main -class MultiAgentCollaboration: +class MultiAgentCollaboration(BaseSwarm): """ Multi-agent collaboration class. @@ -94,7 +95,10 @@ class MultiAgentCollaboration: saved_file_path_name: str = "multi_agent_collab.json", stopping_token: str = "", logging: bool = True, + *args, + **kwargs, ): + super().__init__(*args, **kwargs) self.agents = agents self.select_next_speaker = selection_function self._step = 0 @@ -106,21 +110,12 @@ class MultiAgentCollaboration: self.logger = logger self.logging = logging - def reset(self): - """Resets the state of all agents.""" - for agent in self.agents: - agent.reset() - def inject(self, name: str, message: str): """Injects a message into the multi-agent collaboration.""" for agent in self.agents: agent.run(f"Name {name} and message: {message}") self._step += 1 - def inject_agent(self, agent: Agent): - """Injects an agent into the multi-agent collaboration.""" - self.agents.append(agent) - def step(self) -> tuple[str, str]: """Steps through the multi-agent collaboration.""" speaker_idx = self.select_next_speaker(self._step, self.agents) @@ -213,46 +208,6 @@ class MultiAgentCollaboration: idx = director.select_next_speaker() + 1 return idx - # def run(self, task: str): - # """Runs the multi-agent collaboration.""" - # for step in range(self.max_iters): - # speaker_idx = self.select_next_speaker_roundtable(step, self.agents) - # speaker = self.agents[speaker_idx] - # result = speaker.run(task) - # self.results.append({"agent": speaker, "response": result}) - - # if self.autosave: - # self.save_state() - # if result == self.stopping_token: - # break - # return self.results - - # def run(self, task: str): - # for _ in range(self.max_iters): - # for step, agent, in enumerate(self.agents): - # result = agent.run(task) - # self.results.append({"agent": agent, "response": result}) - # if self.autosave: - # self.save_state() - # if result == self.stopping_token: - # break - - # return self.results - - # def run(self, task: str): - # conversation = task - # for _ in range(self.max_iters): - # for agent in self.agents: - # result = agent.run(conversation) - # self.results.append({"agent": agent, "response": result}) - # conversation = result - - # if self.autosave: - # self.save() - # if result == self.stopping_token: - # break - # return self.results - def run(self, task: str): conversation = task for _ in range(self.max_iters): @@ -306,14 +261,3 @@ class MultiAgentCollaboration: f" max_iters={self.max_iters}, autosave={self.autosave}," f" saved_file_path_name={self.saved_file_path_name})" ) - - def performance(self): - """Tracks and reports the performance of each agent""" - performance_data = {} - for agent in self.agents: - performance_data[agent.name] = agent.get_performance_metrics() - return performance_data - - def set_interaction_rules(self, rules): - """Sets the interaction rules for each agent""" - self.interaction_rules = rules diff --git a/swarms/structs/omni_agent_types.py b/swarms/structs/omni_agent_types.py index 30d65895..db3ca664 100644 --- a/swarms/structs/omni_agent_types.py +++ b/swarms/structs/omni_agent_types.py @@ -9,7 +9,7 @@ from swarms.models.base_multimodal_model import BaseMultiModalModel from swarms.structs.agent import Agent # Unified type for agent -agent = Union[Agent, Callable, Any, AbstractLLM, BaseMultiModalModel] +AgentType = Union[Agent, Callable, Any, AbstractLLM, BaseMultiModalModel] # List of agents -agents = Sequence[agent] +AgentListType = Sequence[AgentType] diff --git a/swarms/structs/schemas.py b/swarms/structs/schemas.py index c465b4df..cc3068b7 100644 --- a/swarms/structs/schemas.py +++ b/swarms/structs/schemas.py @@ -119,13 +119,13 @@ class Status(Enum): completed = "completed" -class Step(StepRequestBody): +class Step(BaseModel): task_id: str = Field( ..., description="The ID of the task this step belongs to.", examples=["50da533e-3904-4401-8a07-c49adf88b5eb"], ) - step_id: str = Field( + step_id: int = Field( ..., description="The ID of the task step.", examples=["6bb1801a-fd80-45e8-899a-4dd723cc602e"], @@ -135,7 +135,6 @@ class Step(StepRequestBody): description="The name of the task step.", examples=["Write to file"], ) - status: Status = Field(..., description="The status of the task step.") output: str | None = Field( None, description="Output of the task step.", @@ -145,12 +144,23 @@ class Step(StepRequestBody): " None: """Resets the workflow by clearing the results of each task.""" try: diff --git a/swarms/structs/swarm_redis_registry.py b/swarms/structs/swarm_redis_registry.py deleted file mode 100644 index 45e2c1a9..00000000 --- a/swarms/structs/swarm_redis_registry.py +++ /dev/null @@ -1,176 +0,0 @@ -from dataclasses import asdict -from typing import List - -import networkx as nx -import redis -from redis.commands.graph import Graph, Node - -from swarms.structs.agent import Agent -from swarms.structs.base_swarm import BaseSwarm - - -class SwarmRelationship: - JOINED = "joined" - - -class RedisSwarmRegistry(BaseSwarm): - """ - Initialize the SwarmRedisRegistry object. - - Args: - host (str): The hostname or IP address of the Redis server. Default is "localhost". - port (int): The port number of the Redis server. Default is 6379. - db: The Redis database number. Default is 0. - graph_name (str): The name of the RedisGraph graph. Default is "swarm_registry". - """ - - def __init__( - self, - host: str = "localhost", - port: int = 6379, - db=0, - graph_name: str = "swarm_registry", - ): - self.redis = redis.StrictRedis( - host=host, port=port, db=db, decode_responses=True - ) - self.redis_graph = Graph(self.redis, graph_name) - self.graph = nx.DiGraph() - - def _entity_to_node(self, entity: Agent | Agent) -> Node: - """ - Converts an Agent or Swarm object to a Node object. - - Args: - entity (Agent | Agent): The Agent or Swarm object to convert. - - Returns: - Node: The converted Node object. - """ - return Node( - node_id=entity.id, - alias=entity.agent_name, - label=entity.agent_description, - properties=asdict(entity), - ) - - def _add_node(self, node: Agent | Agent): - """ - Adds a node to the graph. - - Args: - node (Agent | Agent): The Agent or Swarm node to add. - """ - self.graph.add_node(node.id) - if isinstance(node, Agent): - self.add_swarm_entry(node) - elif isinstance(node, Agent): - self.add_agent_entry(node) - - def _add_edge(self, from_node: Node, to_node: Node, relationship): - """ - Adds an edge between two nodes in the graph. - - Args: - from_node (Node): The source node of the edge. - to_node (Node): The target node of the edge. - relationship: The relationship type between the nodes. - """ - match_query = ( - f"MATCH (a:{from_node.label}),(b:{to_node.label}) WHERE" - f" a.id = {from_node.id} AND b.id = {to_node.id}" - ) - - query = f""" - {match_query} - CREATE (a)-[r:joined]->(b) RETURN r - """.replace( - "\n", "" - ) - - self.redis_graph.query(query) - - def add_swarm_entry(self, swarm: Agent): - """ - Adds a swarm entry to the graph. - - Args: - swarm (Agent): The swarm object to add. - """ - node = self._entity_to_node(swarm) - self._persist_node(node) - - def add_agent_entry(self, agent: Agent): - """ - Adds an agent entry to the graph. - - Args: - agent (Agent): The agent object to add. - """ - node = self._entity_to_node(agent) - self._persist_node(node) - - def join_swarm( - self, - from_entity: Agent | Agent, - to_entity: Agent, - ): - """ - Adds an edge between two nodes in the graph. - - Args: - from_entity (Agent | Agent): The source entity of the edge. - to_entity (Agent): The target entity of the edge. - - Returns: - Any: The result of adding the edge. - """ - from_node = self._entity_to_node(from_entity) - to_node = self._entity_to_node(to_entity) - - return self._add_edge(from_node, to_node, SwarmRelationship.JOINED) - - def _persist_node(self, node: Node): - """ - Persists a node in the graph. - - Args: - node (Node): The node to persist. - """ - query = f"CREATE {node}" - self.redis_graph.query(query) - - def retrieve_swarm_information(self, swarm_id: int) -> Agent: - """ - Retrieves swarm information from the registry. - - Args: - swarm_id (int): The ID of the swarm to retrieve. - - Returns: - Agent: The retrieved swarm information as an Agent object. - """ - swarm_key = f"swarm:{swarm_id}" - swarm_data = self.redis.hgetall(swarm_key) - if swarm_data: - # Parse the swarm_data and return an instance of AgentBase - # You can use the retrieved data to populate the AgentBase attributes - - return Agent(**swarm_data) - return None - - def retrieve_joined_agents(self) -> List[Agent]: - """ - Retrieves a list of joined agents from the registry. - - Returns: - List[Agent]: The retrieved joined agents as a list of Agent objects. - """ - agent_data = self.redis_graph.query( - "MATCH (a:agent)-[:joined]->(b:manager) RETURN a" - ) - if agent_data: - # Parse the agent_data and return an instance of AgentBase - # You can use the retrieved data to populate the AgentBase attributes - return [Agent(**agent_data) for agent_data in agent_data] - return None diff --git a/swarms/tools/__init__.py b/swarms/tools/__init__.py index 6f7e5dc5..1c723993 100644 --- a/swarms/tools/__init__.py +++ b/swarms/tools/__init__.py @@ -14,6 +14,13 @@ from swarms.tools.tool_utils import ( scrape_tool_func_docs, tool_find_by_name, ) +from swarms.tools.pydantic_to_json import ( + _remove_a_key, + pydantic_to_functions, + multi_pydantic_to_functions, + function_to_str, + functions_to_str, +) __all__ = [ "scrape_tool_func_docs", @@ -31,4 +38,9 @@ __all__ = [ "preprocess_json_input", "AgentOutputParser", "execute_tool_by_name", + "_remove_a_key", + "pydantic_to_functions", + "multi_pydantic_to_functions", + "function_to_str", + "functions_to_str", ] diff --git a/swarms/tools/pydantic_to_json.py b/swarms/tools/pydantic_to_json.py new file mode 100644 index 00000000..96fcf789 --- /dev/null +++ b/swarms/tools/pydantic_to_json.py @@ -0,0 +1,134 @@ +from typing import Any, Optional, List + +from docstring_parser import parse +from pydantic import BaseModel + + +def _remove_a_key(d: dict, remove_key: str) -> None: + """Remove a key from a dictionary recursively""" + if isinstance(d, dict): + for key in list(d.keys()): + if key == remove_key and "type" in d.keys(): + del d[key] + else: + _remove_a_key(d[key], remove_key) + + +def pydantic_to_functions( + pydantic_type: type[BaseModel], +) -> dict[str, Any]: + """ + Convert a Pydantic model to a dictionary representation of functions. + + Args: + pydantic_type (type[BaseModel]): The Pydantic model type to convert. + + Returns: + dict[str, Any]: A dictionary representation of the functions. + + """ + schema = pydantic_type.model_json_schema() + + docstring = parse(pydantic_type.__doc__ or "") + parameters = { + k: v + for k, v in schema.items() + if k not in ("title", "description") + } + + for param in docstring.params: + if (name := param.arg_name) in parameters["properties"] and ( + description := param.description + ): + if "description" not in parameters["properties"][name]: + parameters["properties"][name]["description"] = description + + parameters["type"] = "object" + + if "description" not in schema: + if docstring.short_description: + schema["description"] = docstring.short_description + else: + schema["description"] = ( + f"Correctly extracted `{pydantic_type.__class__.__name__.lower()}` with all " + f"the required parameters with correct types" + ) + + _remove_a_key(parameters, "title") + _remove_a_key(parameters, "additionalProperties") + + return { + "function_call": { + "name": pydantic_type.__class__.__name__.lower(), + }, + "functions": [ + { + "name": pydantic_type.__class__.__name__.lower(), + "description": schema["description"], + "parameters": parameters, + }, + ], + } + + +def multi_pydantic_to_functions( + pydantic_types: List[BaseModel] = None +) -> dict[str, Any]: + """ + Converts multiple Pydantic types to a dictionary of functions. + + Args: + pydantic_types (List[BaseModel]]): A list of Pydantic types to convert. + + Returns: + dict[str, Any]: A dictionary containing the converted functions. + + """ + functions: list[dict[str, Any]] = [ + pydantic_to_functions(pydantic_type)["functions"][0] + for pydantic_type in pydantic_types + ] + + return { + "function_call": "auto", + "functions": functions, + } + + +def function_to_str(function: dict[str, Any]) -> str: + """ + Convert a function dictionary to a string representation. + + Args: + function (dict[str, Any]): The function dictionary to convert. + + Returns: + str: The string representation of the function. + + """ + function_str = f"Function: {function['name']}\n" + function_str += f"Description: {function['description']}\n" + function_str += "Parameters:\n" + + for param, details in function["parameters"]["properties"].items(): + function_str += f" {param} ({details['type']}): {details.get('description', '')}\n" + + return function_str + + +def functions_to_str(functions: list[dict[str, Any]]) -> str: + """ + Convert a list of function dictionaries to a string representation. + + Args: + functions (list[dict[str, Any]]): The list of function dictionaries to convert. + + Returns: + str: The string representation of the functions. + + """ + functions_str = "" + for function in functions: + functions_str += function_to_str(function) + "\n" + + return functions_str