[feature][add list, dictionary, and str outputs to swarm architectures through conversation]

pull/788/head
Kye Gomez 2 months ago
parent ba605de17b
commit fe8386780f

@ -0,0 +1,9 @@
from swarms.structs.agent_builder import AgentsBuilder
example_task = "Write a blog post about the benefits of using Swarms for AI agents."
agents_builder = AgentsBuilder()
agents = agents_builder.run(example_task)
print(agents)

@ -59,7 +59,8 @@ task = """
# Run agents with tasks concurrently
swarm = ConcurrentWorkflow(
agents=agents,
return_str_on=True,
return_str_on=False,
output_type="list",
)
print(

@ -65,7 +65,7 @@ Run a single swarm with specified agents and tasks.
| agent_name | string | Required | - | Name of the agent (max 100 chars) |
| description | string | Optional | - | Description of the agent (max 500 chars) |
| system_prompt | string | Optional | - | System prompt for the agent (max 500 chars) |
| model_name | string | Optional | "gpt-4o" | Model to be used by the agent |
| model_name | string | Optional | "gpt-4o" | Model to be used by the agent (follows litellm conventions) |
| auto_generate_prompt | boolean | Optional | false | Whether to auto-generate prompts |
| max_tokens | integer | Optional | - | Maximum tokens for response |
| temperature | float | Optional | 0.5 | Temperature for response generation |

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "7.4.4"
version = "7.4.5"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -0,0 +1,208 @@
import os
from typing import List
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.utils.function_caller_model import OpenAIFunctionCaller
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.agents_available import showcase_available_agents
from swarms.structs.swarms_api import AgentInput as AgentConfig
logger = initialize_logger(log_folder="auto_swarm_builder")
class Agents(BaseModel):
"""Configuration for a list of agents"""
agents: List[AgentConfig] = Field(
description="The list of agents that make up the swarm",
)
BOSS_SYSTEM_PROMPT = """
Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective.
### Instructions:
1. **Task Assignment**:
- Analyze available worker agents when a task is presented.
- Delegate tasks to existing agents with clear, direct, and actionable instructions if an appropriate agent is available.
- If no suitable agent exists, create a new agent with a fitting system prompt to handle the task.
2. **Agent Creation**:
- Name agents according to the task they are intended to perform (e.g., "Twitter Marketing Agent").
- Provide each new agent with a concise and clear system prompt that includes its role, objectives, and any tools it can utilize.
3. **Efficiency**:
- Minimize redundancy and maximize task completion speed.
- Avoid unnecessary agent creation if an existing agent can fulfill the task.
4. **Communication**:
- Be explicit in task delegation instructions to avoid ambiguity and ensure effective task execution.
- Require agents to report back on task completion or encountered issues.
5. **Reasoning and Decisions**:
- Offer brief reasoning when selecting or creating agents to maintain transparency.
- Avoid using an agent if unnecessary, with a clear explanation if no agents are suitable for a task.
# Output Format
Present your plan in clear, bullet-point format or short concise paragraphs, outlining task assignment, agent creation, efficiency strategies, and communication protocols.
# Notes
- Preserve transparency by always providing reasoning for task-agent assignments and creation.
- Ensure instructions to agents are unambiguous to minimize error.
"""
class AgentsBuilder:
"""A class that automatically builds and manages swarms of AI agents.
This class handles the creation, coordination and execution of multiple AI agents working
together as a swarm to accomplish complex tasks. It uses a boss agent to delegate work
and create new specialized agents as needed.
Args:
name (str): The name of the swarm
description (str): A description of the swarm's purpose
verbose (bool, optional): Whether to output detailed logs. Defaults to True.
max_loops (int, optional): Maximum number of execution loops. Defaults to 1.
"""
def __init__(
self,
name: str = None,
description: str = None,
verbose: bool = True,
max_loops: int = 1,
):
self.name = name
self.description = description
self.verbose = verbose
self.max_loops = max_loops
self.agents_pool = []
logger.info(
f"Initialized AutoSwarmBuilder: {name} {description}"
)
def run(self, task: str, image_url: str = None, *args, **kwargs):
"""Run the swarm on a given task.
Args:
task (str): The task to be accomplished
image_url (str, optional): URL of an image input if needed. Defaults to None.
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
Returns:
The output from the swarm's execution
"""
logger.info(f"Running swarm on task: {task}")
agents = self._create_agents(task, image_url, *args, **kwargs)
return agents
def _create_agents(self, task: str, *args, **kwargs):
"""Create the necessary agents for a task.
Args:
task (str): The task to create agents for
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
Returns:
list: List of created agents
"""
logger.info("Creating agents for task")
model = OpenAIFunctionCaller(
system_prompt=BOSS_SYSTEM_PROMPT,
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.1,
base_model=Agents,
)
agents_dictionary = model.run(task)
logger.info(f"Agents dictionary: {agents_dictionary}")
# Convert dictionary to SwarmConfig if needed
if isinstance(agents_dictionary, dict):
agents_dictionary = Agents(**agents_dictionary)
# Create agents from config
agents = []
for agent_config in agents_dictionary.agents:
# Convert dict to AgentConfig if needed
if isinstance(agent_config, dict):
agent_config = AgentConfig(**agent_config)
agent = self.build_agent(
agent_name=agent_config.name,
agent_description=agent_config.description,
agent_system_prompt=agent_config.system_prompt,
model_name=agent_config.model_name,
max_loops=agent_config.max_loops,
dynamic_temperature_enabled=agent_config.dynamic_temperature_enabled,
auto_generate_prompt=agent_config.auto_generate_prompt,
role=agent_config.role,
max_tokens=agent_config.max_tokens,
temperature=agent_config.temperature,
)
agents.append(agent)
# Showcasing available agents
agents_available = showcase_available_agents(
name=self.name,
description=self.description,
agents=agents,
)
for agent in agents:
agent.system_prompt += "\n" + agents_available
return agents
def build_agent(
self,
agent_name: str,
agent_description: str,
agent_system_prompt: str,
max_loops: int = 1,
model_name: str = "gpt-4o",
dynamic_temperature_enabled: bool = True,
auto_generate_prompt: bool = False,
role: str = "worker",
max_tokens: int = 8192,
temperature: float = 0.5,
):
"""Build a single agent with the given specifications.
Args:
agent_name (str): Name of the agent
agent_description (str): Description of the agent's purpose
agent_system_prompt (str): The system prompt for the agent
Returns:
Agent: The constructed agent instance
"""
logger.info(f"Building agent: {agent_name}")
agent = Agent(
agent_name=agent_name,
description=agent_description,
system_prompt=agent_system_prompt,
model_name=model_name,
max_loops=max_loops,
dynamic_temperature_enabled=dynamic_temperature_enabled,
context_length=200000,
output_type="str", # "json", "dict", "csv" OR "string" soon "yaml" and
streaming_on=False,
auto_generate_prompt=auto_generate_prompt,
role=role,
max_tokens=max_tokens,
temperature=temperature,
)
return agent

@ -14,6 +14,7 @@ import concurrent.futures
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.conversation import Conversation
from swarms.structs.swarm_id_generator import generate_swarm_id
from swarms.structs.output_type import OutputType
logger = initialize_logger(log_folder="concurrent_workflow")
@ -109,7 +110,8 @@ class ConcurrentWorkflow(BaseSwarm):
return_str_on: bool = False,
agent_responses: list = [],
auto_generate_prompts: bool = False,
max_workers: int = None,
output_type: OutputType = "dict",
return_entire_history: bool = False,
*args,
**kwargs,
):
@ -130,7 +132,9 @@ class ConcurrentWorkflow(BaseSwarm):
self.return_str_on = return_str_on
self.agent_responses = agent_responses
self.auto_generate_prompts = auto_generate_prompts
self.max_workers = max_workers or os.cpu_count()
self.max_workers = os.cpu_count()
self.output_type = output_type
self.return_entire_history = return_entire_history
self.tasks = [] # Initialize tasks list
self.reliability_check()
@ -316,6 +320,10 @@ class ConcurrentWorkflow(BaseSwarm):
elif self.return_entire_history:
return self.conversation.return_history_as_string()
elif self.output_type == "list":
return self.conversation.return_messages_as_list()
elif self.output_type == "dict":
return self.conversation.return_messages_as_dictionary()
else:
return self.output_schema.model_dump_json(indent=4)

@ -416,10 +416,19 @@ class Conversation(BaseStructure):
for message in self.conversation_history
]
def return_messages_as_dictionary(self):
return [
{
"role": message["role"],
"content": message["content"],
}
for message in self.conversation_history
]
# Example usage
conversation = Conversation()
conversation.add("user", "Hello, how are you?")
# conversation = Conversation()
# conversation.add("user", "Hello, how are you?")
# print(conversation.get_last_message_as_string())
# print(conversation.return_messages_as_list())
# conversation.add("assistant", "I am doing well, thanks.")

@ -9,6 +9,7 @@ from typing import Any, Callable, List, Optional
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.multi_agent_exec import run_agents_concurrently
from swarms.structs.output_type import OutputType
from swarms.utils.formatter import formatter
from swarms.utils.loguru_logger import initialize_logger
@ -147,6 +148,7 @@ class MajorityVoting:
autosave: bool = False,
verbose: bool = False,
max_loops: int = 1,
output_type: OutputType = "dict",
*args,
**kwargs,
):
@ -158,6 +160,7 @@ class MajorityVoting:
self.autosave = autosave
self.verbose = verbose
self.max_loops = max_loops
self.output_type = output_type
self.conversation = Conversation(
time_enabled=True, *args, **kwargs
@ -202,7 +205,7 @@ class MajorityVoting:
self.conversation.add(agent.agent_name, response)
responses = self.conversation.return_history_as_string()
print(responses)
# print(responses)
prompt = f"""Conduct a detailed majority voting analysis on the following conversation:
{responses}
@ -236,8 +239,17 @@ class MajorityVoting:
)
# Return the majority vote
return self.conversation.return_history_as_string()
# return self.conversation.return_history_as_string()
if self.output_type == "str":
return self.conversation.get_str()
elif self.output_type == "dict":
return self.conversation.return_messages_as_dictionary()
elif self.output_type == "list":
return self.conversation.return_messages_as_list()
else:
return self.conversation.return_history_as_string()
def batch_run(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:

@ -12,6 +12,8 @@ from swarms.prompts.ag_prompt import aggregator_system_prompt
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.any_to_str import any_to_str
import concurrent.futures
from swarms.structs.output_type import OutputType
from swarms.structs.conversation import Conversation
logger = initialize_logger(log_folder="mixture_of_agents")
@ -69,6 +71,7 @@ class MixtureOfAgents:
layers: int = 3,
max_loops: int = 1,
return_str_on: bool = False,
output_type: OutputType = "dict",
) -> None:
"""
Initialize the Mixture of Agents class with agents and configuration.
@ -89,6 +92,7 @@ class MixtureOfAgents:
self.layers: int = layers
self.max_loops: int = max_loops
self.return_str_on: bool = return_str_on
self.output_type: OutputType = output_type
self.input_schema = MixtureOfAgentsInput(
name=name,
@ -109,6 +113,8 @@ class MixtureOfAgents:
)
self.reliability_check()
self.conversation = Conversation()
def reliability_check(self) -> None:
"""
@ -191,6 +197,7 @@ class MixtureOfAgents:
# Run the agent asynchronously
response = await asyncio.to_thread(agent.run, task)
self.output_schema.normal_agent_outputs.append(
agent.agent_output
)
@ -241,6 +248,7 @@ class MixtureOfAgents:
task (str): The task for the mixture of agents.
"""
try:
self.conversation.add("user", task)
prev_context = None
for _ in range(self.max_loops):
@ -263,8 +271,12 @@ class MixtureOfAgents:
log_agent_data(self.output_schema.model_dump())
if self.return_str_on:
return any_to_str(self.output_schema.model_dump())
if self.return_str_on or self.output_type == "str":
return self.conversation.get_str()
elif self.output_type == "dict":
return self.conversation.return_messages_as_dictionary()
elif self.output_type == "list":
return self.conversation.return_messages_as_list()
else:
return self.output_schema.model_dump_json(indent=4)

@ -0,0 +1,18 @@
from typing import Literal
# Define the output_type using Literal
OutputType = Literal[
"all",
"final",
"list",
"dict",
".json",
".md",
".txt",
".yaml",
".toml",
"str",
]
# Use the OutputType for type annotations
output_type: OutputType

@ -11,9 +11,11 @@ from swarms.schemas.agent_step_schemas import ManySteps
from swarms.structs.agent import Agent
from swarms.structs.agents_available import showcase_available_agents
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.output_types import OutputType
from swarms.utils.loguru_logger import initialize_logger
from swarms.telemetry.main import log_agent_data
from swarms.structs.conversation import Conversation
from swarms.structs.output_type import OutputType
logger = initialize_logger(log_folder="rearrange")
@ -114,6 +116,7 @@ class AgentRearrange(BaseSwarm):
no_use_clusterops: bool = True,
autosave: bool = True,
return_entire_history: bool = False,
rules: str = None,
*args,
**kwargs,
):
@ -154,6 +157,11 @@ class AgentRearrange(BaseSwarm):
outputs=[],
)
self.conversation = Conversation()
if rules:
self.conversation.add("user", rules)
def showcase_agents(self):
# Get formatted agent info once
agents_available = showcase_available_agents(
@ -305,6 +313,8 @@ class AgentRearrange(BaseSwarm):
Exception: For any other errors during execution
"""
try:
self.conversation.add("user", task)
if not self.validate_flow():
logger.error("Flow validation failed")
return "Invalid flow configuration."
@ -394,6 +404,9 @@ class AgentRearrange(BaseSwarm):
**kwargs,
)
result = str(result)
self.conversation.add(
agent.agent_name, result
)
results.append(result)
response_dict[agent_name] = result
self.output_schema.outputs.append(
@ -446,6 +459,9 @@ class AgentRearrange(BaseSwarm):
**kwargs,
)
current_task = str(current_task)
self.conversation.add(
agent.agent_name, current_task
)
response_dict[agent_name] = current_task
self.output_schema.outputs.append(
agent.agent_output
@ -475,7 +491,7 @@ class AgentRearrange(BaseSwarm):
elif self.output_type == "list":
output = all_responses
elif self.output_type == "dict":
output = response_dict
output = self.conversation.return_messages_as_dictionary()
else: # "final"
output = current_task

@ -1,7 +1,7 @@
from typing import List, Optional
from swarms.structs.agent import Agent
from swarms.structs.rearrange import AgentRearrange
from swarms.structs.output_types import OutputType
from swarms.structs.output_type import OutputType
from concurrent.futures import ThreadPoolExecutor, as_completed
from swarms.utils.loguru_logger import initialize_logger
@ -30,7 +30,7 @@ class SequentialWorkflow:
description: str = "Sequential Workflow, where agents are executed in a sequence.",
agents: List[Agent] = [],
max_loops: int = 1,
output_type: OutputType = "all",
output_type: OutputType = "dict",
return_json: bool = False,
shared_memory_system: callable = None,
return_entire_history: bool = False,
@ -136,7 +136,7 @@ class SequentialWorkflow:
"""
try:
return self.agent_rearrange.run(
result = self.agent_rearrange.run(
task=task,
img=img,
device=device,
@ -144,9 +144,20 @@ class SequentialWorkflow:
device_id=device_id,
all_gpus=all_gpus,
no_use_clusterops=no_use_clusterops,
output_type=self.output_type,
*args,
**kwargs,
)
if self.output_type == "dict":
result = self.agent_rearrange.conversation.return_messages_as_dictionary()
elif self.output_type == "list":
result = self.agent_rearrange.conversation.return_messages_as_list()
elif self.output_type == "str" or self.return_json:
result = self.agent_rearrange.conversation.get_str()
else:
return result
except Exception as e:
logger.error(
f"An error occurred while executing the task: {e}"

@ -18,6 +18,7 @@ from swarms.structs.rearrange import AgentRearrange
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
from swarms.structs.swarm_matcher import swarm_matcher
from swarms.structs.output_type import OutputType
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="swarm_router")
@ -146,7 +147,7 @@ class SwarmRouter:
shared_memory_system: Any = None,
rules: str = None,
documents: List[str] = [], # A list of docs file paths
output_type: str = "all",
output_type: OutputType = "dict",
no_cluster_ops: bool = False,
speaker_fn: callable = None,
load_agents_from_csv: bool = False,
@ -323,6 +324,7 @@ class SwarmRouter:
aggregator_system_prompt=aggregator_system_prompt.get_prompt(),
aggregator_agent=self.agents[-1],
layers=self.max_loops,
output_type=self.output_type,
*args,
**kwargs,
)
@ -386,6 +388,7 @@ class SwarmRouter:
max_loops=self.max_loops,
auto_save=self.autosave,
return_str_on=self.return_entire_history,
output_type=self.output_type,
*args,
**kwargs,
)

@ -53,6 +53,10 @@ class AgentInput(BaseModel):
1,
description="The maximum number of iterations the agent is allowed to perform.",
)
dynamic_temperature_enabled: Optional[bool] = Field(
True,
description="Indicates whether the agent should use dynamic temperature.",
)
class SwarmRequest(BaseModel):

@ -1,3 +1,5 @@
# tools - search, code executor, create api
import os
import requests
from dotenv import load_dotenv
@ -25,30 +27,33 @@ def run_single_swarm():
"agent_name": "Market Analyst",
"description": "Analyzes market trends",
"system_prompt": "You are a financial analyst expert.",
"model_name": "gpt-4o",
"model_name": "groq/deepseek-r1-distill-qwen-32b",
"role": "worker",
"max_loops": 1,
"max_tokens": 8192,
},
{
"agent_name": "Economic Forecaster",
"description": "Predicts economic trends",
"system_prompt": "You are an expert in economic forecasting.",
"model_name": "gpt-4o",
"model_name": "groq/deepseek-r1-distill-qwen-32b",
"role": "worker",
"max_loops": 1,
"max_tokens": 8192,
},
{
"agent_name": "Data Scientist",
"description": "Performs data analysis",
"system_prompt": "You are a data science expert.",
"model_name": "gpt-4o",
"model_name": "groq/deepseek-r1-distill-qwen-32b",
"role": "worker",
"max_loops": 1,
"max_tokens": 8192,
},
],
"max_loops": 1,
"swarm_type": "ConcurrentWorkflow",
"task": "Analyze current market trends in tech sector",
"task": "What are the best etfs and index funds for ai and tech?",
"output_type": "str",
"return_history": True,
}
@ -77,7 +82,3 @@ if __name__ == "__main__":
result = run_single_swarm()
print("Swarm Result:")
print(result)
# print("Swarm Logs:")
# logs = get_logs()
# print(logs)

Loading…
Cancel
Save