You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/ant_swarm.py

208 lines
6.9 KiB

3 months ago
import os
import time
from typing import List, Dict, Any, Union
from pydantic import BaseModel, Field
from loguru import logger
from swarms import Agent
from swarm_models import OpenAIChat
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Pydantic model to track metadata for each agent
class AgentMetadata(BaseModel):
agent_id: str
start_time: float = Field(default_factory=time.time)
end_time: Union[float, None] = None
task: str
output: Union[str, None] = None
error: Union[str, None] = None
status: str = "running"
# Worker Agent class
class WorkerAgent:
def __init__(
self,
agent_name: str,
system_prompt: str,
model_name: str = "gpt-4o-mini",
):
"""Initialize a Worker agent with its own model, name, and system prompt."""
api_key = os.getenv("OPENAI_API_KEY")
# Create the LLM model for the worker
self.model = OpenAIChat(
openai_api_key=api_key,
model_name=model_name,
temperature=0.1,
)
# Initialize the worker agent with a unique prompt and name
self.agent = Agent(
agent_name=agent_name,
system_prompt=system_prompt,
llm=self.model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path=f"{agent_name}_state.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
)
def perform_task(self, task: str) -> Dict[str, Any]:
"""Perform the task assigned by the Queen and return the result."""
metadata = AgentMetadata(
agent_id=self.agent.agent_name, task=task
)
try:
logger.info(
f"{self.agent.agent_name} is starting task '{task}'."
)
result = self.agent.run(task)
metadata.output = result
metadata.status = "completed"
except Exception as e:
logger.error(
f"{self.agent.agent_name} encountered an error: {e}"
)
metadata.error = str(e)
metadata.status = "failed"
finally:
metadata.end_time = time.time()
return metadata.dict()
# Queen Agent class to manage the workers and dynamically decompose tasks
class QueenAgent:
def __init__(
self,
worker_count: int = 5,
model_name: str = "gpt-4o-mini",
queen_name: str = "Queen-Agent",
queen_prompt: str = "You are the queen of the hive",
):
"""Initialize the Queen agent who assigns tasks to workers.
Args:
worker_count (int): Number of worker agents to manage.
model_name (str): The model used by worker agents.
queen_name (str): The name of the Queen agent.
queen_prompt (str): The system prompt for the Queen agent.
"""
self.queen_name = queen_name
self.queen_prompt = queen_prompt
# Queen agent initialization with a unique prompt for dynamic task decomposition
api_key = os.getenv("OPENAI_API_KEY")
self.queen_model = OpenAIChat(
openai_api_key=api_key,
model_name=model_name,
temperature=0.1,
)
# Initialize worker agents
self.workers = [
WorkerAgent(
agent_name=f"Worker-{i+1}",
system_prompt=f"Worker agent {i+1}, specialized in helping with financial analysis.",
model_name=model_name,
)
for i in range(worker_count)
]
self.worker_metadata: Dict[str, AgentMetadata] = {}
def decompose_task(self, task: str) -> List[str]:
"""Dynamically decompose a task into multiple subtasks using prompting."""
decomposition_prompt = f"""You are a highly efficient problem solver. Given the following task:
'{task}', please decompose this task into 3-5 smaller subtasks, and explain how they can be completed step by step."""
logger.info(
f"{self.queen_name} is generating subtasks using prompting for the task: '{task}'"
)
# Use the queen's model to generate subtasks dynamically
subtasks_output = self.queen_model.run(decomposition_prompt)
logger.info(f"Queen output: {subtasks_output}")
subtasks = subtasks_output.split("\n")
# Filter and clean up subtasks
subtasks = [
subtask.strip() for subtask in subtasks if subtask.strip()
]
return subtasks
def assign_subtasks(self, subtasks: List[str]) -> Dict[str, Any]:
"""Assign subtasks to workers dynamically and collect their results.
Args:
subtasks (List[str]): The list of subtasks to distribute among workers.
Returns:
dict: A dictionary containing results from workers.
"""
logger.info(
f"{self.queen_name} is assigning subtasks to workers."
)
results = {}
for i, subtask in enumerate(subtasks):
# Assign each subtask to a different worker
worker = self.workers[
i % len(self.workers)
] # Circular assignment if more subtasks than workers
worker_result = worker.perform_task(subtask)
results[worker_result["agent_id"]] = worker_result
self.worker_metadata[worker_result["agent_id"]] = (
worker_result
)
return results
def gather_results(self) -> Dict[str, Any]:
"""Gather all results from the worker agents."""
return self.worker_metadata
def run_swarm(self, task: str) -> Dict[str, Any]:
"""Run the swarm by decomposing a task into subtasks, assigning them to workers, and gathering results."""
logger.info(f"{self.queen_name} is initializing the swarm.")
# Decompose the task into subtasks using prompting
subtasks = self.decompose_task(task)
logger.info(f"Subtasks generated by the Queen: {subtasks}")
# Assign subtasks to workers
results = self.assign_subtasks(subtasks)
logger.info(
f"{self.queen_name} has collected results from all workers."
)
return results
# Example usage
if __name__ == "__main__":
# Queen oversees 3 worker agents with a custom system prompt
queen = QueenAgent(
worker_count=3,
queen_name="Queen-Overseer",
queen_prompt="You are the overseer queen of a financial analysis swarm. Decompose and distribute tasks wisely.",
)
# Task for the swarm to execute
task = "Analyze the best strategies to establish a ROTH IRA and maximize tax savings."
# Run the swarm on the task and gather results
final_results = queen.run_swarm(task)
print("Final Swarm Results:", final_results)