[FEAT][Agent][Structured Outputs]

pull/449/head
Kye 9 months ago
parent 9cbbdc9a10
commit 5728bff63a

@ -0,0 +1,54 @@
from pydantic import BaseModel, Field
from swarms import Anthropic
from swarms import Agent
# Initialize the schema for the person's information
class Schema(BaseModel):
name: str = Field(..., title="Name of the person")
agent: int = Field(..., title="Age of the person")
is_student: bool = Field(..., title="Whether the person is a student")
courses: list[str] = Field(
..., title="List of courses the person is taking"
)
# Convert the schema to a JSON string
tool_schema = Schema(
name="Tool Name",
agent=1,
is_student=True,
courses=["Course1", "Course2"],
)
# Define the task to generate a person's information
task = "Generate a person's information based on the following schema:"
# Initialize the agent
agent = Agent(
agent_name="Person Information Generator",
system_prompt=(
"Generate a person's information based on the following schema:"
),
# Set the tool schema to the JSON string -- this is the key difference
tool_schema=tool_schema,
llm=Anthropic(),
max_loops=3,
autosave=True,
dashboard=False,
streaming_on=True,
verbose=True,
interactive=True,
# Set the output type to the tool schema which is a BaseModel
output_type=tool_schema, # or dict, or str
metadata_output_type="json",
# List of schemas that the agent can handle
list_tool_schemas = [tool_schema],
function_calling_format_type = "OpenAI",
function_calling_type = "json" # or soon yaml
)
# Run the agent to generate the person's information
generated_data = agent.run(task)
# Print the generated data
print(f"Generated data: {generated_data}")

@ -0,0 +1,98 @@
swarms
pip install swarms
swarms is the most pythonic way of writing cognitive systems. Leveraging pydantic models as output schemas combined with langchain in the backend allows for a seamless integration of llms into your apps. It utilizes OpenAI Functions or LlamaCpp grammars (json-schema-mode) for efficient structured output. In the backend it compiles the swarms syntax into langchain runnables so you can easily invoke, stream or batch process your pipelines.
Open in GitHub Codespaces
from pydantic import BaseModel, Field
from swarms import Anthropic
from swarms import Agent
# Initialize the schema for the person's information
class Schema(BaseModel):
name: str = Field(..., title="Name of the person")
agent: int = Field(..., title="Age of the person")
is_student: bool = Field(..., title="Whether the person is a student")
courses: list[str] = Field(
..., title="List of courses the person is taking"
)
# Convert the schema to a JSON string
tool_schema = Schema(
name="Tool Name",
agent=1,
is_student=True,
courses=["Course1", "Course2"],
)
# Define the task to generate a person's information
task = "Generate a person's information based on the following schema:"
# Initialize the agent
agent = Agent(
agent_name="Person Information Generator",
system_prompt=(
"Generate a person's information based on the following schema:"
),
# Set the tool schema to the JSON string -- this is the key difference
tool_schema=tool_schema,
llm=Anthropic(),
max_loops=3,
autosave=True,
dashboard=False,
streaming_on=True,
verbose=True,
interactive=True,
# Set the output type to the tool schema which is a BaseModel
output_type=tool_schema, # or dict, or str
metadata_output_type="json",
# List of schemas that the agent can handle
list_tool_schemas = [tool_schema],
function_calling_format_type = "OpenAI",
function_calling_type = "json" # or soon yaml
)
# Run the agent to generate the person's information
generated_data = agent.run(task)
# Print the generated data
print(f"Generated data: {generated_data}")
Features
🐍 pythonic
🔀 easy swap between openai or local models
🔄 dynamic output types (pydantic models, or primitives)
👁️ vision llm support
🧠 langchain_core as backend
📝 jinja templating for prompts
🏗️ reliable structured output
🔁 auto retry parsing
🔧 langsmith support
🔄 sync, async, streaming, parallel, fallbacks
📦 gguf download from huggingface
✅ type hints for all functions and mypy support
🗣️ chat router component
🧩 composable with langchain LCEL
🛠️ easy error handling
🚦 enums and literal support
📐 custom parsing types
Documentation
Checkout the docs here 👈
Also highly recommend to try and run the examples in the ./examples folder.
Contribution
You want to contribute? Thanks, that's great! For more information checkout the Contributing Guide. Please run the dev setup to get started:
git clone https://github.com/kyegomez/swarms.git && cd swarms
./dev_setup.sh
About
⛓️ build cognitive systems, pythonic

@ -0,0 +1,7 @@
from swarms import AutoSwarm
# Initialize the swarm
swarm = AutoSwarm("kyegomez/myswarm")
# Run the swarm
swarm.run("Generate a 10,000 word blog on health and wellness.")

@ -1,4 +1,4 @@
from swarms import BaseSwarm, AutoSwarm, AutoSwarmRouter, Agent, Anthropic
from swarms import BaseSwarm, Agent, Anthropic
class MarketingSwarm(BaseSwarm):

@ -1,4 +1,4 @@
from swarms import AutoSwarm, AutoSwarmRouter, BaseSwarm, Agent, Anthropic
from swarms import BaseSwarm, Agent, Anthropic
class MySwarm(BaseSwarm):

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "4.8.4"
version = "4.8.6"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]
@ -51,6 +51,7 @@ python-dotenv = "*"
accelerate = "0.28.0"
opencv-python = "^4.9.0.80"
yaml = "*"
docstring_parser = "0.16"
[tool.poetry.group.lint.dependencies]
black = "^23.1.0"
@ -70,16 +71,6 @@ fastapi = "^0.110.1"
[tool.ruff]
line-length = 75
[tool.ruff.lint]
select = ["E", "F", "W", "I", "UP"]
ignore = []
fixable = ["ALL"]
unfixable = []
preview = true
[tool.ruff.lint.per-file-ignores]
"swarms/prompts/**.py" = ["E501"]
[tool.black]
target-version = ["py38"]
line-length = 75

@ -16,3 +16,4 @@ Pillow==10.2.0
termcolor==2.2.0
psutil
sentry-sdk
docstring_parser==0.16

@ -29,9 +29,6 @@ from swarms.models.popular_llms import (
from swarms.models.popular_llms import (
OpenAILLM as OpenAI,
)
from swarms.models.popular_llms import (
ReplicateLLM as Replicate,
)
from swarms.models.popular_llms import OctoAIChat
from swarms.models.qwen import QwenVLMultiModal # noqa: E402

@ -4,15 +4,12 @@ from langchain_community.chat_models.azure_openai import (
from langchain_community.chat_models.openai import (
ChatOpenAI as OpenAIChat,
)
from langchain_community.llms import (
Anthropic,
Cohere,
MosaicML,
OpenAI,
Replicate,
)
from langchain.llms.anthropic import Anthropic
from langchain.llms.cohere import Cohere
from langchain.llms.mosaicml import MosaicML
from langchain.llms.openai import OpenAI #, OpenAIChat, AzureOpenAI
from langchain_community.llms.octoai_endpoint import OctoAIEndpoint
from langchain.llms.replicate import Replicate
class AnthropicChat(Anthropic):
def __call__(self, *args, **kwargs):
@ -34,7 +31,7 @@ class OpenAILLM(OpenAI):
return self.invoke(*args, **kwargs)
class ReplicateLLM(Replicate):
class ReplicateChat(Replicate):
def __call__(self, *args, **kwargs):
return self.invoke(*args, **kwargs)
@ -45,6 +42,9 @@ class AzureOpenAILLM(AzureChatOpenAI):
class OpenAIChatLLM(OpenAIChat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def __call__(self, *args, **kwargs):
return self.invoke(*args, **kwargs)

@ -5,7 +5,6 @@ from swarms.structs.agent_process import (
AgentProcessQueue,
)
from swarms.structs.auto_swarm import AutoSwarm, AutoSwarmRouter
from swarms.structs.autoscaler import AutoScaler
from swarms.structs.base_structure import BaseStructure
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.base_workflow import BaseWorkflow
@ -87,7 +86,6 @@ from swarms.structs.yaml_model import (
YamlModel,
)
__all__ = [
"Agent",
"AgentJob",
@ -95,7 +93,6 @@ __all__ = [
"AgentProcessQueue",
"AutoSwarm",
"AutoSwarmRouter",
"AutoScaler",
"BaseStructure",
"BaseSwarm",
"BaseWorkflow",

@ -1,6 +1,7 @@
import asyncio
import json
import logging
from typing import Union
import os
import random
import sys
@ -26,6 +27,12 @@ from swarms.utils.pdf_to_text import pdf_to_text
from swarms.tools.exec_tool import execute_tool_by_name
from swarms.tools.code_executor import CodeExecutor
from swarms.prompts.worker_prompt import tool_usage_worker_prompt
from pydantic import BaseModel
from swarms.tools.pydantic_to_json import (
pydantic_to_functions,
multi_pydantic_to_functions,
)
from swarms.structs.schemas import Step, ManySteps
# Utils
@ -63,13 +70,17 @@ def step_id():
return str(uuid.uuid1())
# Agent output types
agent_output_type = Union[BaseModel, dict, str]
ToolUsageType = Union[BaseModel, Dict[str, Any]]
# [FEAT][AGENT]
class Agent:
"""
Agent is the backbone to connect LLMs with tools and long term memory. Agent also provides the ability to
ingest any type of docs like PDFs, Txts, Markdown, Json, and etc for the agent. Here is a list of features.
Args:
llm (Any): The language model to use
template (str): The template to use
@ -155,22 +166,22 @@ class Agent:
def __init__(
self,
id: str = agent_id,
llm: Any = None,
id: Optional[str] = agent_id,
llm: Optional[Any] = None,
template: Optional[str] = None,
max_loops: Optional[int] = 1,
stopping_condition: Optional[Callable[[str], bool]] = None,
loop_interval: int = 0,
retry_attempts: int = 3,
retry_interval: int = 1,
return_history: bool = False,
stopping_token: str = None,
loop_interval: Optional[int] = 0,
retry_attempts: Optional[int] = 3,
retry_interval: Optional[int] = 1,
return_history: Optional[bool] = False,
stopping_token: Optional[str] = None,
dynamic_loops: Optional[bool] = False,
interactive: bool = False,
dashboard: bool = False,
agent_name: str = "swarm-worker-01",
agent_description: str = None,
system_prompt: str = AGENT_SYSTEM_PROMPT_3,
interactive: Optional[bool] = False,
dashboard: Optional[bool] = False,
agent_name: Optional[str] = "swarm-worker-01",
agent_description: Optional[str] = None,
system_prompt: Optional[str] = AGENT_SYSTEM_PROMPT_3,
tools: List[BaseTool] = [],
dynamic_temperature_enabled: Optional[bool] = False,
sop: Optional[str] = None,
@ -178,7 +189,7 @@ class Agent:
saved_state_path: Optional[str] = None,
autosave: Optional[bool] = False,
context_length: Optional[int] = 8192,
user_name: str = "Human:",
user_name: Optional[str] = "Human:",
self_healing_enabled: Optional[bool] = False,
code_interpreter: Optional[bool] = False,
multi_modal: Optional[bool] = None,
@ -187,22 +198,22 @@ class Agent:
tokenizer: Optional[Any] = None,
long_term_memory: Optional[AbstractVectorDatabase] = None,
preset_stopping_token: Optional[bool] = False,
traceback: Any = None,
traceback_handlers: Any = None,
traceback: Optional[Any] = None,
traceback_handlers: Optional[Any] = None,
streaming_on: Optional[bool] = False,
docs: List[str] = None,
docs_folder: str = None,
verbose: bool = False,
docs_folder: Optional[str] = None,
verbose: Optional[bool] = False,
parser: Optional[Callable] = None,
best_of_n: Optional[int] = None,
callback: Optional[Callable] = None,
metadata: Optional[Dict[str, Any]] = None,
callbacks: Optional[List[Callable]] = None,
logger_handler: Any = sys.stderr,
logger_handler: Optional[Any] = sys.stderr,
search_algorithm: Optional[Callable] = None,
logs_to_filename: Optional[str] = None,
evaluator: Optional[Callable] = None,
output_json: bool = False,
output_json: Optional[bool] = False,
stopping_func: Optional[Callable] = None,
custom_loop_condition: Optional[Callable] = None,
sentiment_threshold: Optional[float] = None,
@ -210,6 +221,13 @@ class Agent:
sentiment_analyzer: Optional[Callable] = None,
limit_tokens_from_string: Optional[Callable] = None,
custom_tools_prompt: Optional[Callable] = None,
tool_schema: ToolUsageType = None,
output_type: agent_output_type = None,
function_calling_type: str = "json",
output_cleaner: Optional[Callable] = None,
function_calling_format_type: Optional[str] = "OpenAI",
list_tool_schemas: Optional[List[BaseModel]] = None,
metadata_output_type: str = "json",
*args,
**kwargs,
):
@ -270,6 +288,13 @@ class Agent:
self.custom_exit_command = custom_exit_command
self.sentiment_analyzer = sentiment_analyzer
self.limit_tokens_from_string = limit_tokens_from_string
self.tool_schema = tool_schema
self.output_type = output_type
self.function_calling_type = function_calling_type
self.output_cleaner = output_cleaner
self.function_calling_format_type = function_calling_format_type
self.list_tool_schemas = list_tool_schemas
self.metadata_output_type = metadata_output_type
# The max_loops will be set dynamically if the dynamic_loop
if self.dynamic_loops:
@ -293,11 +318,15 @@ class Agent:
)
# If the preset stopping token is enabled then set the stopping token to the preset stopping token
if preset_stopping_token:
if preset_stopping_token is not None:
self.stopping_token = "<DONE>"
# If the stopping function is provided then set the stopping condition to the stopping function
self.short_memory = Conversation(
system_prompt=self.system_prompt, time_enabled=True
system_prompt=system_prompt,
time_enabled=True,
*args,
**kwargs
)
# If the docs exist then ingest the docs
@ -359,6 +388,30 @@ class Agent:
# logger.info("Creating Agent {}".format(self.agent_name))
# If the tool types
if self.tool_schema is not None:
logger.info("Tool schema provided")
tool_schema_str = self.tool_schema_to_str(self.tool_schema)
print(tool_schema_str)
# Add to the short memory
logger.info(f"Adding tool schema to memory: {tool_schema_str}")
self.short_memory.add(
role=self.user_name, content=tool_schema_str
)
# If a list of tool schemas:
if self.list_tool_schemas is not None:
logger.info("Tool schema provided")
tool_schema_str = self.tool_schemas_to_str(list_tool_schemas)
# Add to the short memory
logger.info(f"Adding tool schema to memory: {tool_schema_str}")
self.short_memory.add(
role=self.user_name, content=tool_schema_str
)
def set_system_prompt(self, system_prompt: str):
"""Set the system prompt"""
self.system_prompt = system_prompt
@ -522,6 +575,78 @@ class Agent:
for chunk in content:
print(chunk, end="")
########################## FUNCTION CALLING ##########################
def json_str_to_json(self, json_str: str):
"""Convert a JSON string to a JSON object"""
return json.loads(json_str)
def json_str_to_pydantic_model(self, json_str: str, model: BaseModel):
"""Convert a JSON string to a Pydantic model"""
return model.model_validate_json(json_str)
def json_str_to_dict(self, json_str: str):
"""Convert a JSON string to a dictionary"""
return json.loads(json_str)
def pydantic_model_to_json_str(self, model: BaseModel):
return str(pydantic_to_functions(model))
def dict_to_json_str(self, dictionary: dict):
"""Convert a dictionary to a JSON string"""
return json.dumps(dictionary)
def dict_to_pydantic_model(self, dictionary: dict, model: BaseModel):
"""Convert a dictionary to a Pydantic model"""
return model.model_validate_json(dictionary)
# def prep_pydantic_model_for_str(self, model: BaseModel):
# # Convert to Function
# out = self.pydantic_model_to_json_str(model)
# # return function_to_str(out)
def tool_schema_to_str(
self, tool_schema: BaseModel = None, *args, **kwargs
):
"""Convert a tool schema to a string"""
out = pydantic_to_functions(tool_schema)
return str(out)
def tool_schemas_to_str(
self, tool_schemas: List[BaseModel] = None, *args, **kwargs
):
"""Convert a list of tool schemas to a string"""
out = multi_pydantic_to_functions(tool_schemas)
return str(out)
def str_to_pydantic_model(self, string: str, model: BaseModel):
"""Convert a string to a Pydantic model"""
return model.model_validate_json(string)
def list_str_to_pydantic_model(
self, list_str: List[str], model: BaseModel
):
"""Convert a list of strings to a Pydantic model"""
# return model.model_validate_json(list_str)
for string in list_str:
return model.model_validate_json(string)
def prepare_output_for_output_model(
self, output: agent_output_type = None
):
"""Prepare the output for the output model"""
if self.output_type == BaseModel:
return self.str_to_pydantic_model(output, self.output_type)
elif self.output_type == dict:
return self.dict_to_json_str(output)
elif self.output_type == str:
return output
else:
return output
########################## FUNCTION CALLING ##########################
def _history(self, user_name: str, task: str) -> str:
"""Generate the history for the history prompt
@ -567,6 +692,7 @@ class Agent:
loop_count = 0
response = None
step_pool = []
while (
self.max_loops == "auto"
@ -722,10 +848,37 @@ class Agent:
)
time.sleep(self.loop_interval)
# Save Step Metadata
active_step = Step(
task_id=task_id(),
step_id=loop_count,
name=task,
output=response,
max_loops=self.max_loops,
)
step_pool.append(active_step)
# Save the step pool
# self.step_cache = step_pool
if self.autosave:
logger.info("Autosaving agent state.")
self.save_state(self.saved_state_path)
# Apply the cleaner function to the response
if self.output_cleaner is not None:
response = self.output_cleaner(response)
# Prepare the output for the output model
if self.output_type is not None:
response = self.prepare_output_for_output_model(response)
# List of steps for this task
ManySteps(task_id=task_id(), steps=step_pool)
# Save Many steps
return response
except Exception as error:
print(f"Error running agent: {error}")
@ -790,7 +943,7 @@ class Agent:
Returns:
str: The agent history prompt
"""
ltr = str(self.long_term_memory.query(query), *args, **kwargs)
ltr = self.long_term_memory.query(query, *args, **kwargs)
context = f"""
System: This reminds you of these events from your past: [{ltr}]
@ -1195,7 +1348,7 @@ class Agent:
def reset(self):
"""Reset the agent"""
self.short_memory = {}
self.short_memory = None
def run_code(self, code: str):
"""

@ -2,7 +2,7 @@ from datetime import datetime
from pydantic import BaseModel
from swarms.structs.omni_agent_types import agents
from swarms.structs.omni_agent_types import AgentListType
from swarms.utils.loguru_logger import logger
from typing import Callable
@ -54,7 +54,7 @@ class AgentProcessQueue:
[]
) # Currently use list to simulate queue
def add(self, agents: agents):
def add(self, agents: AgentListType):
"""
Adds an agent process to the queue.

@ -1,71 +1,9 @@
from typing import Any, Callable, Dict, Optional, Sequence
from swarms.models.base_llm import AbstractLLM
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.loguru_logger import logger
class SequentialAccountingSwarm(BaseSwarm):
"""SequentialAccountingSwarm class represents a swarm of agents that can be created automatically.
Flow:
name -> router -> swarm entry point
Attributes:
name (str, optional): The name of the swarm. Defaults to "kyegomez/sequential-accounting-swarm".
description (str, optional): The description of the swarm. Defaults to None.
verbose (bool): Whether to enable verbose mode or not. Defaults to False.
custom_params (dict, optional): Custom parameters for the swarm. Defaults to None.
iters (int, optional): The number of iterations for the swarm simulation. Defaults to 100.
max_agents (int, optional): The maximum number of agents in the swarm. Defaults to 100.
agents (Sequence[AbstractLLM], optional): The list of agents in the swarm. Defaults to None.
Methods:
run(task: str = None, *args, **kwargs) -> Any:
Run the swarm simulation.
"""
def __init__(
self,
name: Optional[str] = "kyegomez/sequential-accounting-swarm",
description: Optional[str] = None,
verbose: bool = False,
custom_params: Optional[Dict[str, Any]] = None,
iters: Optional[int] = 100,
max_agents: Optional[int] = 100,
agents: Sequence[AbstractLLM] = None,
*args,
**kwargs,
):
super().__init__()
self.name = name
self.description = description
self.verbose = verbose
self.custom_params = custom_params
self.iters = iters
self.max_agents = max_agents
self.agents = agents
def run(self, task: str = None, *args, **kwargs):
"""Run the swarm simulation.
Args:
task (str, optional): The task to be performed by the agents. Defaults to None.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
Any: The final result of the swarm simulation.
"""
for agent in self.agents:
out = agent.run(task, *args, **kwargs)
final = agent.run(out)
return final
class AutoSwarmRouter(BaseSwarm):
"""AutoSwarmRouter class represents a router for the AutoSwarm class.
@ -175,7 +113,6 @@ class AutoSwarm(BaseSwarm):
description: Optional[str] = None,
verbose: bool = False,
custom_params: Optional[Dict[str, Any]] = None,
router: Optional[AutoSwarmRouter] = None,
custom_preprocess: Optional[Callable] = None,
custom_postprocess: Optional[Callable] = None,
custom_router: Optional[Callable] = None,
@ -187,7 +124,6 @@ class AutoSwarm(BaseSwarm):
self.description = description
self.verbose = verbose
self.custom_params = custom_params
self.router = router
self.custom_preprocess = custom_preprocess
self.custom_postprocess = custom_postprocess
self.router = AutoSwarmRouter(

@ -1,329 +0,0 @@
import logging
import queue
import threading
from time import sleep
from typing import Callable, Dict, List, Optional
from termcolor import colored
from swarms.structs.agent import Agent
from swarms.structs.base_structure import BaseStructure
from swarms.utils.decorators import (
error_decorator,
log_decorator,
timing_decorator,
)
class AutoScaler(BaseStructure):
"""
AutoScaler class
The AutoScaler class is responsible for managing the agents pool
and the task queue. It also monitors the health of the agents and
scales the pool up or down based on the number of pending tasks
and the current load of the agents.
Args:
initial_agents (Optional[int], optional): Initial number of
agents to start with. Defaults to 10.
scale_up_factor (int, optional): Factor by which to scale up
the agents pool. Defaults to 1.
idle_threshold (float, optional): Threshold for scaling down
the agents pool. Defaults to 0.2.
busy_threshold (float, optional): Threshold for scaling up
the agents pool. Defaults to 0.7.
agents (List[Agent], optional): List of agents to use in the
pool. Defaults to None.
autoscale (Optional[bool], optional): Whether to autoscale
the agents pool. Defaults to True.
min_agents (Optional[int], optional): Minimum number of
agents to keep in the pool. Defaults to 10.
max_agents (Optional[int], optional): Maximum number of
agents to keep in the pool. Defaults to 100.
custom_scale_strategy (Optional[Callable], optional): Custom
scaling strategy to use. Defaults to None.
Methods:
add_task: Add tasks to queue
scale_up: Add more agents
scale_down: scale down
run: Run agent the task on the agent id
monitor_and_scale: Monitor and scale
start: Start scaling
check_agent_health: Checks the health of each agent and
replaces unhealthy agents.
balance_load: Distributes tasks among agents based on their
current load.
set_scaling_strategy: Set a custom scaling strategy.
execute_scaling_strategy: Execute the custom scaling strategy
if defined.
report_agent_metrics: Collects and reports metrics from each
agent.
report: Reports the current state of the autoscaler.
print_dashboard: Prints a dashboard of the current state of
the autoscaler.
Examples:
>>> import os
>>> from dotenv import load_dotenv
>>> # Import the OpenAIChat model and the Agent struct
>>> from swarms.models import OpenAIChat
>>> from swarms.structs import Agent
>>> from swarms.structs.autoscaler import AutoScaler
>>> # Load the environment variables
>>> load_dotenv()
>>> # Get the API key from the environment
>>> api_key = os.environ.get("OPENAI_API_KEY")
>>> # Initialize the language model
>>> llm = OpenAIChat(
... temperature=0.5,
... openai_api_key=api_key,
... )
>>> ## Initialize the workflow
>>> agent = Agent(llm=llm, max_loops=1, dashboard=True)
>>> # Load the autoscaler
>>> autoscaler = AutoScaler(
... initial_agents=2,
... scale_up_factor=1,
... idle_threshold=0.2,
... busy_threshold=0.7,
... agents=[agent],
... autoscale=True,
... min_agents=1,
... max_agents=5,
... custom_scale_strategy=None,
... )
>>> print(autoscaler)
>>> # Run the workflow on a task
>>> out = autoscaler.run(agent.id, "Generate a 10,000 word blog on health and wellness.")
>>> print(out)
"""
@log_decorator
@error_decorator
@timing_decorator
def __init__(
self,
initial_agents: Optional[int] = 10,
scale_up_factor: int = 1,
idle_threshold: float = 0.2,
busy_threshold: float = 0.7,
agents: List[Agent] = None,
autoscale: Optional[bool] = True,
min_agents: Optional[int] = 10,
max_agents: Optional[int] = 100,
custom_scale_strategy: Optional[Callable] = None,
*args,
**kwargs,
):
self.agents_pool = agents or [
agents[0]() for _ in range(initial_agents)
]
self.task_queue = queue.Queue()
self.scale_up_factor = scale_up_factor
self.idle_threshold = idle_threshold
self.busy_threshold = busy_threshold
self.lock = threading.Lock()
self.agents = agents
self.autoscale = autoscale
self.min_agents = min_agents
self.max_agents = max_agents
self.custom_scale_strategy = custom_scale_strategy
def add_task(self, task):
"""Add tasks to queue"""
try:
self.task_queue.put(task)
except Exception as error:
print(
f"Error adding task to queue: {error} try again with"
" a new task"
)
@log_decorator
@error_decorator
@timing_decorator
def scale_up(self):
"""Add more agents"""
try:
with self.lock:
new_agents_counts = (
len(self.agents_pool) * self.scale_up_factor
)
for _ in range(new_agents_counts):
self.agents_pool.append(self.agents[0]())
except Exception as error:
print(f"Error scaling up: {error} try again with a new task")
def scale_down(self):
"""scale down"""
try:
with self.lock:
if (
len(self.agents_pool) > 10
): # ensure minmum of 10 agents
del self.agents_pool[-1] # remove last agent
except Exception as error:
print(
f"Error scaling down: {error} try again with a new" " task"
)
def run(self, agent_id, task: Optional[str] = None, *args, **kwargs):
"""Run agent the task on the agent id
Args:
agent_id (_type_): _description_
task (str, optional): _description_. Defaults to None.
Raises:
ValueError: _description_
Returns:
_type_: _description_
"""
for agent in self.agents_pool:
if agent.id == agent_id:
return agent.run(task, *args, **kwargs)
raise ValueError(f"No agent found with ID {agent_id}")
@log_decorator
@error_decorator
@timing_decorator
def monitor_and_scale(self):
"""Monitor and scale"""
try:
while True:
sleep(60) # check minute
pending_tasks = self.task_queue.qsize()
active_agents = sum(
[1 for agent in self.agents_pool if agent.is_busy()]
)
if (
pending_tasks / len(self.agents_pool)
> self.busy_threshold
):
self.scale_up()
elif (
active_agents / len(self.agents_pool)
< self.idle_threshold
):
self.scale_down()
except Exception as error:
print(
f"Error monitoring and scaling: {error} try again"
" with a new task"
)
@log_decorator
@error_decorator
@timing_decorator
def start(self):
"""Start scaling"""
try:
monitor_thread = threading.Thread(
target=self.monitor_and_scale
)
monitor_thread.start()
while True:
task = self.task_queue.get()
if task:
available_agent = next(
agent for agent in self.agents_pool
)
if available_agent:
available_agent.run(task)
except Exception as error:
print(f"Error starting: {error} try again with a new task")
def check_agent_health(self):
"""Checks the health of each agent and replaces unhealthy agents."""
for i, agent in enumerate(self.agents_pool):
if not agent.is_healthy():
logging.warning(f"Replacing unhealthy agent at index {i}")
self.agents_pool[i] = self.agent()
def balance_load(self):
"""Distributes tasks among agents based on their current load."""
try:
while not self.task_queue.empty():
for agent in self.agents_pool:
if agent.can_accept_task():
task = self.task_queue.get()
agent.run(task)
except Exception as error:
print(
f"Error balancing load: {error} try again with a new"
" task"
)
def set_scaling_strategy(self, strategy: Callable[[int, int], int]):
"""Set a custom scaling strategy."""
self.custom_scale_strategy = strategy
def execute_scaling_strategy(self):
"""Execute the custom scaling strategy if defined."""
try:
if hasattr(self, "custom_scale_strategy"):
scale_amount = self.custom_scale_strategy(
self.task_queue.qsize(), len(self.agents_pool)
)
if scale_amount > 0:
for _ in range(scale_amount):
self.agents_pool.append(self.agent())
elif scale_amount < 0:
for _ in range(abs(scale_amount)):
if len(self.agents_pool) > 10:
del self.agents_pool[-1]
except Exception as error:
print(
f"Error executing scaling strategy: {error} try again"
" with a new task"
)
def report_agent_metrics(self) -> Dict[str, List[float]]:
"""Collects and reports metrics from each agent."""
metrics = {
"completion_time": [],
"success_rate": [],
"error_rate": [],
}
for agent in self.agents_pool:
agent_metrics = agent.get_metrics()
for key in metrics.keys():
metrics[key].append(agent_metrics.get(key, 0.0))
return metrics
def report(self):
"""Reports the current state of the autoscaler."""
self.check_agent_health()
self.balance_load()
self.execute_scaling_strategy()
metrics = self.report_agent_metrics()
print(metrics)
def print_dashboard(self):
"""Prints a dashboard of the current state of the autoscaler."""
print(
colored(
f"""
Autoscaler Dashboard
--------------------
Agents: {len(self.agents_pool)}
Initial Agents: {self.initial_agents}
self.scale_up_factor: {self.scale_up_factor}
self.idle_threshold: {self.idle_threshold}
self.busy_threshold: {self.busy_threshold}
self.task_queue.qsize(): {self.task_queue.qsize()}
self.task_queue.empty(): {self.task_queue.empty()}
self.task_queue.full(): {self.task_queue.full()}
self.task_queue.maxsize: {self.task_queue.maxsize}
""",
"cyan",
)
)

@ -12,10 +12,10 @@ try:
import gzip
except ImportError as error:
print(f"Error importing gzip: {error}")
from pydantic import BaseModel
# from pydantic import BaseModel
class BaseStructure(BaseModel):
class BaseStructure:
"""Base structure.
@ -64,12 +64,23 @@ class BaseStructure(BaseModel):
BaseStructure(name=None, description=None, save_metadata=True, save_artifact_path='./artifacts', save_metadata_path='./metadata', save_error_path='./errors')
"""
name: Optional[str] = None
description: Optional[str] = None
save_metadata: bool = True
save_artifact_path: Optional[str] = "./artifacts"
save_metadata_path: Optional[str] = "./metadata"
save_error_path: Optional[str] = "./errors"
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
save_metadata: bool = True,
save_artifact_path: Optional[str] = "./artifacts",
save_metadata_path: Optional[str] = "./metadata",
save_error_path: Optional[str] = "./errors",
):
super().__init__()
self.name = name
self.description = description
self.save_metadata = save_metadata
self.save_artifact_path = save_artifact_path
self.save_metadata_path = save_metadata_path
self.save_error_path = save_error_path
def run(self, *args, **kwargs):
"""Run the structure."""

@ -16,7 +16,7 @@ import yaml
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.loguru_logger import logger
from swarms.structs.omni_agent_types import agent
from swarms.structs.omni_agent_types import AgentType
class BaseSwarm(ABC):
@ -68,15 +68,18 @@ class BaseSwarm(ABC):
def __init__(
self,
# name: str = "Swarm",
agents: List[Agent] = None,
models: List[Any] = None,
max_loops: int = 200,
name: Optional[str] = None,
description: Optional[str] = None,
agents: Optional[List[Agent]] = None,
models: Optional[List[Any]] = None,
max_loops: Optional[int] = 200,
callbacks: Optional[Sequence[callable]] = None,
autosave: bool = False,
logging: bool = False,
return_metadata: bool = False,
metadata_filename: str = "multiagent_structure_metadata.json",
autosave: Optional[bool] = False,
logging: Optional[bool] = False,
return_metadata: Optional[bool] = False,
metadata_filename: Optional[
str
] = "multiagent_structure_metadata.json",
stopping_function: Optional[Callable] = None,
stopping_condition: Optional[str] = "stop",
stopping_condition_args: Optional[Dict] = None,
@ -84,6 +87,8 @@ class BaseSwarm(ABC):
**kwargs,
):
"""Initialize the swarm with agents"""
self.name = name
self.description = description
self.agents = agents
self.models = models
self.max_loops = max_loops
@ -96,31 +101,11 @@ class BaseSwarm(ABC):
self.stopping_condition = stopping_condition
self.stopping_condition_args = stopping_condition_args
# Initialize conversation
self.conversation = Conversation(
time_enabled=True, *args, **kwargs
)
# Handle the case where the agents are not provided
# Handle agents
for agent in self.agents:
if not isinstance(agent, Agent):
raise TypeError("Agents must be of type Agent.")
if self.agents is None:
self.agents = []
# Handle the case where the callbacks are not provided
if self.callbacks is None:
self.callbacks = []
# Handle the case where the autosave is not provided
if self.autosave is None:
self.autosave = False
# Handle the case where the logging is not provided
if self.logging is None:
self.logging = False
# Handle callbacks
if callbacks is not None:
for callback in self.callbacks:
@ -154,11 +139,9 @@ class BaseSwarm(ABC):
self.stopping_condition_args = stopping_condition_args
self.stopping_condition = stopping_condition
# @abstractmethod
def communicate(self):
"""Communicate with the swarm through the orchestrator, protocols, and the universal communication layer"""
# @abstractmethod
def run(self):
"""Run the swarm"""
...
@ -186,12 +169,11 @@ class BaseSwarm(ABC):
def step(self):
"""Step the swarm"""
# @abstractmethod
def add_agent(self, agent: agent):
def add_agent(self, agent: AgentType):
"""Add a agent to the swarm"""
self.agents.append(agent)
def add_agents(self, agents: List[agent]):
def add_agents(self, agents: List[AgentType]):
"""Add a list of agents to the swarm"""
self.agents.extend(agents)
@ -200,8 +182,7 @@ class BaseSwarm(ABC):
agent = self.get_agent_by_id(agent_id)
self.add_agent(agent)
# @abstractmethod
def remove_agent(self, agent: agent):
def remove_agent(self, agent: AgentType):
"""Remove a agent from the swarm"""
self.agents.remove(agent)
@ -211,88 +192,70 @@ class BaseSwarm(ABC):
if agent.name == name:
return agent
# @abstractmethod
def broadcast(self, message: str, sender: Optional[agent] = None):
def reset_all_agents(self):
"""Resets the state of all agents."""
for agent in self.agents:
agent.reset()
def broadcast(self, message: str, sender: Optional[AgentType] = None):
"""Broadcast a message to all agents"""
# @abstractmethod
def reset(self):
"""Reset the swarm"""
# @abstractmethod
def plan(self, task: str):
"""agents must individually plan using a workflow or pipeline"""
# @abstractmethod
def direct_message(
self,
message: str,
sender: agent,
recipient: agent,
sender: AgentType,
recipient: AgentType,
):
"""Send a direct message to a agent"""
# @abstractmethod
def autoscaler(self, num_agents: int, agent: [agent]):
def autoscaler(self, num_agents: int, agent: List[AgentType]):
"""Autoscaler that acts like kubernetes for autonomous agents"""
# @abstractmethod
def get_agent_by_id(self, id: str) -> agent:
def get_agent_by_id(self, id: str) -> AgentType:
"""Locate a agent by id"""
# @abstractmethod
def get_agent_by_name(self, name: str) -> agent:
"""Locate a agent by name"""
# @abstractmethod
def assign_task(self, agent: agent, task: Any) -> Dict:
def assign_task(self, agent: AgentType, task: Any) -> Dict:
"""Assign a task to a agent"""
# @abstractmethod
def get_all_tasks(self, agent: agent, task: Any):
def get_all_tasks(self, agent: AgentType, task: Any):
"""Get all tasks"""
# @abstractmethod
def get_finished_tasks(self) -> List[Dict]:
"""Get all finished tasks"""
# @abstractmethod
def get_pending_tasks(self) -> List[Dict]:
"""Get all pending tasks"""
# @abstractmethod
def pause_agent(self, agent: agent, agent_id: str):
def pause_agent(self, agent: AgentType, agent_id: str):
"""Pause a agent"""
# @abstractmethod
def resume_agent(self, agent: agent, agent_id: str):
def resume_agent(self, agent: AgentType, agent_id: str):
"""Resume a agent"""
# @abstractmethod
def stop_agent(self, agent: agent, agent_id: str):
def stop_agent(self, agent: AgentType, agent_id: str):
"""Stop a agent"""
# @abstractmethod
def restart_agent(self, agent: agent):
def restart_agent(self, agent: AgentType):
"""Restart agent"""
# @abstractmethod
def scale_up(self, num_agent: int):
"""Scale up the number of agents"""
# @abstractmethod
def scale_down(self, num_agent: int):
"""Scale down the number of agents"""
# @abstractmethod
def scale_to(self, num_agent: int):
"""Scale to a specific number of agents"""
# @abstractmethod
def get_all_agents(self) -> List[agent]:
def get_all_agents(self) -> List[AgentType]:
"""Get all agents"""
# @abstractmethod
def get_swarm_size(self) -> int:
"""Get the size of the swarm"""
@ -506,7 +469,6 @@ class BaseSwarm(ABC):
)
return list(responses)
# @abstractmethod
def add_swarm_entry(self, swarm):
"""
Add the information of a joined Swarm to the registry.

@ -1,25 +0,0 @@
from dataclasses import dataclass
@dataclass
class BlockDevice:
"""
Represents a block device.
Attributes:
device (str): The device name.
cluster (str): The cluster name.
description (str): A description of the block device.
"""
def __init__(self, device: str, cluster: str, description: str):
self.device = device
self.cluster = cluster
self.description = description
def __str__(self):
return (
f"BlockDevice(device={self.device},"
f" cluster={self.cluster},"
f" description={self.description})"
)

@ -83,11 +83,11 @@ class Conversation(BaseStructure):
self.context_length = context_length
# If system prompt is not None, add it to the conversation history
if self.system_prompt:
self.add("system", self.system_prompt)
if self.system_prompt is not None:
self.add("System: ", self.system_prompt)
# If tokenizer then truncate
if tokenizer:
if tokenizer is not None:
self.truncate_memory_with_tokenizer()
def add(self, role: str, content: str, *args, **kwargs):

@ -2,6 +2,7 @@ import logging
from dataclasses import dataclass
from typing import Dict, List
from swarms.structs.agent import Agent
logger = logging.getLogger(__name__)
@ -126,6 +127,7 @@ class GroupChat:
return "\n".join(formatted_messages)
@dataclass
class GroupChatManager:
"""
GroupChatManager
@ -142,9 +144,8 @@ class GroupChatManager:
"""
def __init__(self, groupchat: GroupChat, selector: Agent):
self.groupchat = groupchat
self.selector = selector
groupchat: GroupChat
selector: Agent
def __call__(self, task: str):
"""Call 'GroupChatManager' instance as a function.

@ -1,70 +0,0 @@
import multiprocessing as mp
from typing import List, Optional
from swarms.structs.base_structure import BaseStructure
class LoadBalancer(BaseStructure):
"""
A load balancer class that distributes tasks among multiple workers.
Args:
num_workers (int): The number of worker processes to create.
agents (Optional[List]): A list of agents to assign to the load balancer.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Attributes:
num_workers (int): The number of worker processes.
agents (Optional[List]): A list of agents assigned to the load balancer.
tasks (List): A list of tasks to be executed.
results (List): A list of results from the executed tasks.
workers (List): A list of worker processes.
Methods:
add_task: Add a task to the load balancer.
"""
def __init__(
self,
num_workers: int = 1,
agents: Optional[List] = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.num_workers = num_workers
self.agents = agents
self.tasks = []
self.results = []
self.workers = []
self._init_workers()
def _init_workers(self):
for i in range(self.num_workers):
worker = mp.Process(target=self._worker)
worker.start()
self.workers.append(worker)
def _worker(self):
while True:
task = self._get_task()
if task is None:
break
result = self._run_task(task)
self._add_result(result)
def _get_task(self):
if len(self.tasks) == 0:
return None
return self.tasks.pop(0)
def _run_task(self, task):
return task()
def _add_result(self, result):
self.results.append(result)
def add_task(self, task):
self.tasks.append(task)

@ -1,4 +1,5 @@
import datetime
from typing import Dict, Optional
class Message:
@ -15,13 +16,18 @@ class Message:
print(mes)
"""
def __init__(self, sender, content, metadata=None):
self.timestamp = datetime.datetime.now()
self.sender = sender
self.content = content
self.metadata = metadata or {}
def __init__(
self,
sender: str,
content: str,
metadata: Optional[Dict[str, str]] = None,
):
self.timestamp: datetime.datetime = datetime.datetime.now()
self.sender: str = sender
self.content: str = content
self.metadata: Dict[str, str] = metadata or {}
def __repr__(self):
def __repr__(self) -> str:
"""
__repr__ means...
"""

@ -7,6 +7,7 @@ from langchain.output_parsers import RegexParser
from swarms.structs.agent import Agent
from swarms.utils.logger import logger
from swarms.structs.base_swarm import BaseSwarm
# utils
@ -24,7 +25,7 @@ bid_parser = BidOutputParser(
# main
class MultiAgentCollaboration:
class MultiAgentCollaboration(BaseSwarm):
"""
Multi-agent collaboration class.
@ -94,7 +95,10 @@ class MultiAgentCollaboration:
saved_file_path_name: str = "multi_agent_collab.json",
stopping_token: str = "<DONE>",
logging: bool = True,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.agents = agents
self.select_next_speaker = selection_function
self._step = 0
@ -106,21 +110,12 @@ class MultiAgentCollaboration:
self.logger = logger
self.logging = logging
def reset(self):
"""Resets the state of all agents."""
for agent in self.agents:
agent.reset()
def inject(self, name: str, message: str):
"""Injects a message into the multi-agent collaboration."""
for agent in self.agents:
agent.run(f"Name {name} and message: {message}")
self._step += 1
def inject_agent(self, agent: Agent):
"""Injects an agent into the multi-agent collaboration."""
self.agents.append(agent)
def step(self) -> tuple[str, str]:
"""Steps through the multi-agent collaboration."""
speaker_idx = self.select_next_speaker(self._step, self.agents)
@ -213,46 +208,6 @@ class MultiAgentCollaboration:
idx = director.select_next_speaker() + 1
return idx
# def run(self, task: str):
# """Runs the multi-agent collaboration."""
# for step in range(self.max_iters):
# speaker_idx = self.select_next_speaker_roundtable(step, self.agents)
# speaker = self.agents[speaker_idx]
# result = speaker.run(task)
# self.results.append({"agent": speaker, "response": result})
# if self.autosave:
# self.save_state()
# if result == self.stopping_token:
# break
# return self.results
# def run(self, task: str):
# for _ in range(self.max_iters):
# for step, agent, in enumerate(self.agents):
# result = agent.run(task)
# self.results.append({"agent": agent, "response": result})
# if self.autosave:
# self.save_state()
# if result == self.stopping_token:
# break
# return self.results
# def run(self, task: str):
# conversation = task
# for _ in range(self.max_iters):
# for agent in self.agents:
# result = agent.run(conversation)
# self.results.append({"agent": agent, "response": result})
# conversation = result
# if self.autosave:
# self.save()
# if result == self.stopping_token:
# break
# return self.results
def run(self, task: str):
conversation = task
for _ in range(self.max_iters):
@ -306,14 +261,3 @@ class MultiAgentCollaboration:
f" max_iters={self.max_iters}, autosave={self.autosave},"
f" saved_file_path_name={self.saved_file_path_name})"
)
def performance(self):
"""Tracks and reports the performance of each agent"""
performance_data = {}
for agent in self.agents:
performance_data[agent.name] = agent.get_performance_metrics()
return performance_data
def set_interaction_rules(self, rules):
"""Sets the interaction rules for each agent"""
self.interaction_rules = rules

@ -9,7 +9,7 @@ from swarms.models.base_multimodal_model import BaseMultiModalModel
from swarms.structs.agent import Agent
# Unified type for agent
agent = Union[Agent, Callable, Any, AbstractLLM, BaseMultiModalModel]
AgentType = Union[Agent, Callable, Any, AbstractLLM, BaseMultiModalModel]
# List of agents
agents = Sequence[agent]
AgentListType = Sequence[AgentType]

@ -119,13 +119,13 @@ class Status(Enum):
completed = "completed"
class Step(StepRequestBody):
class Step(BaseModel):
task_id: str = Field(
...,
description="The ID of the task this step belongs to.",
examples=["50da533e-3904-4401-8a07-c49adf88b5eb"],
)
step_id: str = Field(
step_id: int = Field(
...,
description="The ID of the task step.",
examples=["6bb1801a-fd80-45e8-899a-4dd723cc602e"],
@ -135,7 +135,6 @@ class Step(StepRequestBody):
description="The name of the task step.",
examples=["Write to file"],
)
status: Status = Field(..., description="The status of the task step.")
output: str | None = Field(
None,
description="Output of the task step.",
@ -145,12 +144,23 @@ class Step(StepRequestBody):
" <write_to_file('output.txt', 'Washington')"
],
)
additional_output: StepOutput | None = None
artifacts: list[Artifact] = Field(
[],
description="A list of artifacts that the step has produced.",
)
is_last: bool | None = Field(
False,
description="Whether this is the last step in the task.",
max_loops: int = Field(
1,
description="The maximum number of times to run the workflow.",
)
class ManySteps(BaseModel):
task_id: str = Field(
...,
description="The ID of the task this step belongs to.",
examples=["50da533e-3904-4401-8a07-c49adf88b5eb"],
)
steps: list[Step] = Field(
[],
description="A list of task steps.",
)

@ -93,36 +93,6 @@ class SequentialWorkflow:
return out
# try:
# # If the agent is a Task instance, we include the task in kwargs for Agent.run()
# # Append the task to the task_pool list
# if task:
# self.task_pool.append(task)
# logger.info(
# f"[INFO][SequentialWorkflow] Added task {task} to"
# " workflow"
# )
# elif tasks:
# for task in tasks:
# self.task_pool.append(task)
# logger.info(
# "[INFO][SequentialWorkflow] Added task"
# f" {task} to workflow"
# )
# else:
# if task and tasks is not None:
# # Add the task and list of tasks to the task_pool at the same time
# self.task_pool.append(task)
# for task in tasks:
# self.task_pool.append(task)
# except Exception as error:
# logger.error(
# colored(
# f"Error adding task to workflow: {error}", "red"
# ),
# )
def reset_workflow(self) -> None:
"""Resets the workflow by clearing the results of each task."""
try:

@ -1,176 +0,0 @@
from dataclasses import asdict
from typing import List
import networkx as nx
import redis
from redis.commands.graph import Graph, Node
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
class SwarmRelationship:
JOINED = "joined"
class RedisSwarmRegistry(BaseSwarm):
"""
Initialize the SwarmRedisRegistry object.
Args:
host (str): The hostname or IP address of the Redis server. Default is "localhost".
port (int): The port number of the Redis server. Default is 6379.
db: The Redis database number. Default is 0.
graph_name (str): The name of the RedisGraph graph. Default is "swarm_registry".
"""
def __init__(
self,
host: str = "localhost",
port: int = 6379,
db=0,
graph_name: str = "swarm_registry",
):
self.redis = redis.StrictRedis(
host=host, port=port, db=db, decode_responses=True
)
self.redis_graph = Graph(self.redis, graph_name)
self.graph = nx.DiGraph()
def _entity_to_node(self, entity: Agent | Agent) -> Node:
"""
Converts an Agent or Swarm object to a Node object.
Args:
entity (Agent | Agent): The Agent or Swarm object to convert.
Returns:
Node: The converted Node object.
"""
return Node(
node_id=entity.id,
alias=entity.agent_name,
label=entity.agent_description,
properties=asdict(entity),
)
def _add_node(self, node: Agent | Agent):
"""
Adds a node to the graph.
Args:
node (Agent | Agent): The Agent or Swarm node to add.
"""
self.graph.add_node(node.id)
if isinstance(node, Agent):
self.add_swarm_entry(node)
elif isinstance(node, Agent):
self.add_agent_entry(node)
def _add_edge(self, from_node: Node, to_node: Node, relationship):
"""
Adds an edge between two nodes in the graph.
Args:
from_node (Node): The source node of the edge.
to_node (Node): The target node of the edge.
relationship: The relationship type between the nodes.
"""
match_query = (
f"MATCH (a:{from_node.label}),(b:{to_node.label}) WHERE"
f" a.id = {from_node.id} AND b.id = {to_node.id}"
)
query = f"""
{match_query}
CREATE (a)-[r:joined]->(b) RETURN r
""".replace(
"\n", ""
)
self.redis_graph.query(query)
def add_swarm_entry(self, swarm: Agent):
"""
Adds a swarm entry to the graph.
Args:
swarm (Agent): The swarm object to add.
"""
node = self._entity_to_node(swarm)
self._persist_node(node)
def add_agent_entry(self, agent: Agent):
"""
Adds an agent entry to the graph.
Args:
agent (Agent): The agent object to add.
"""
node = self._entity_to_node(agent)
self._persist_node(node)
def join_swarm(
self,
from_entity: Agent | Agent,
to_entity: Agent,
):
"""
Adds an edge between two nodes in the graph.
Args:
from_entity (Agent | Agent): The source entity of the edge.
to_entity (Agent): The target entity of the edge.
Returns:
Any: The result of adding the edge.
"""
from_node = self._entity_to_node(from_entity)
to_node = self._entity_to_node(to_entity)
return self._add_edge(from_node, to_node, SwarmRelationship.JOINED)
def _persist_node(self, node: Node):
"""
Persists a node in the graph.
Args:
node (Node): The node to persist.
"""
query = f"CREATE {node}"
self.redis_graph.query(query)
def retrieve_swarm_information(self, swarm_id: int) -> Agent:
"""
Retrieves swarm information from the registry.
Args:
swarm_id (int): The ID of the swarm to retrieve.
Returns:
Agent: The retrieved swarm information as an Agent object.
"""
swarm_key = f"swarm:{swarm_id}"
swarm_data = self.redis.hgetall(swarm_key)
if swarm_data:
# Parse the swarm_data and return an instance of AgentBase
# You can use the retrieved data to populate the AgentBase attributes
return Agent(**swarm_data)
return None
def retrieve_joined_agents(self) -> List[Agent]:
"""
Retrieves a list of joined agents from the registry.
Returns:
List[Agent]: The retrieved joined agents as a list of Agent objects.
"""
agent_data = self.redis_graph.query(
"MATCH (a:agent)-[:joined]->(b:manager) RETURN a"
)
if agent_data:
# Parse the agent_data and return an instance of AgentBase
# You can use the retrieved data to populate the AgentBase attributes
return [Agent(**agent_data) for agent_data in agent_data]
return None

@ -14,6 +14,13 @@ from swarms.tools.tool_utils import (
scrape_tool_func_docs,
tool_find_by_name,
)
from swarms.tools.pydantic_to_json import (
_remove_a_key,
pydantic_to_functions,
multi_pydantic_to_functions,
function_to_str,
functions_to_str,
)
__all__ = [
"scrape_tool_func_docs",
@ -31,4 +38,9 @@ __all__ = [
"preprocess_json_input",
"AgentOutputParser",
"execute_tool_by_name",
"_remove_a_key",
"pydantic_to_functions",
"multi_pydantic_to_functions",
"function_to_str",
"functions_to_str",
]

@ -0,0 +1,134 @@
from typing import Any, Optional, List
from docstring_parser import parse
from pydantic import BaseModel
def _remove_a_key(d: dict, remove_key: str) -> None:
"""Remove a key from a dictionary recursively"""
if isinstance(d, dict):
for key in list(d.keys()):
if key == remove_key and "type" in d.keys():
del d[key]
else:
_remove_a_key(d[key], remove_key)
def pydantic_to_functions(
pydantic_type: type[BaseModel],
) -> dict[str, Any]:
"""
Convert a Pydantic model to a dictionary representation of functions.
Args:
pydantic_type (type[BaseModel]): The Pydantic model type to convert.
Returns:
dict[str, Any]: A dictionary representation of the functions.
"""
schema = pydantic_type.model_json_schema()
docstring = parse(pydantic_type.__doc__ or "")
parameters = {
k: v
for k, v in schema.items()
if k not in ("title", "description")
}
for param in docstring.params:
if (name := param.arg_name) in parameters["properties"] and (
description := param.description
):
if "description" not in parameters["properties"][name]:
parameters["properties"][name]["description"] = description
parameters["type"] = "object"
if "description" not in schema:
if docstring.short_description:
schema["description"] = docstring.short_description
else:
schema["description"] = (
f"Correctly extracted `{pydantic_type.__class__.__name__.lower()}` with all "
f"the required parameters with correct types"
)
_remove_a_key(parameters, "title")
_remove_a_key(parameters, "additionalProperties")
return {
"function_call": {
"name": pydantic_type.__class__.__name__.lower(),
},
"functions": [
{
"name": pydantic_type.__class__.__name__.lower(),
"description": schema["description"],
"parameters": parameters,
},
],
}
def multi_pydantic_to_functions(
pydantic_types: List[BaseModel] = None
) -> dict[str, Any]:
"""
Converts multiple Pydantic types to a dictionary of functions.
Args:
pydantic_types (List[BaseModel]]): A list of Pydantic types to convert.
Returns:
dict[str, Any]: A dictionary containing the converted functions.
"""
functions: list[dict[str, Any]] = [
pydantic_to_functions(pydantic_type)["functions"][0]
for pydantic_type in pydantic_types
]
return {
"function_call": "auto",
"functions": functions,
}
def function_to_str(function: dict[str, Any]) -> str:
"""
Convert a function dictionary to a string representation.
Args:
function (dict[str, Any]): The function dictionary to convert.
Returns:
str: The string representation of the function.
"""
function_str = f"Function: {function['name']}\n"
function_str += f"Description: {function['description']}\n"
function_str += "Parameters:\n"
for param, details in function["parameters"]["properties"].items():
function_str += f" {param} ({details['type']}): {details.get('description', '')}\n"
return function_str
def functions_to_str(functions: list[dict[str, Any]]) -> str:
"""
Convert a list of function dictionaries to a string representation.
Args:
functions (list[dict[str, Any]]): The list of function dictionaries to convert.
Returns:
str: The string representation of the functions.
"""
functions_str = ""
for function in functions:
functions_str += function_to_str(function) + "\n"
return functions_str
Loading…
Cancel
Save