[EXPERIMENTAL]

pull/584/head
Your Name 4 months ago
parent bf1265b3f0
commit 93b6ee71f4

@ -5,7 +5,7 @@ from swarm_models import OpenAIChat
from swarms.prompts.finance_agent_sys_prompt import ( from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT, FINANCIAL_AGENT_SYS_PROMPT,
) )
from swarms.structs.a_star_swarm import AStarSwarm from examples.structs.swarms.experimental.a_star_swarm import AStarSwarm
# Set up the model as provided # Set up the model as provided
api_key = os.getenv("OPENAI_API_KEY") api_key = os.getenv("OPENAI_API_KEY")

@ -2,7 +2,7 @@ import os
from swarms import Agent from swarms import Agent
from swarm_models import OpenAIChat from swarm_models import OpenAIChat
from swarms.structs.dfs_search_swarm import DFSSwarm from examples.structs.swarms.experimental.dfs_search_swarm import DFSSwarm
# Get the OpenAI API key from the environment variable # Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY") api_key = os.getenv("OPENAI_API_KEY")

@ -1,214 +0,0 @@
import concurrent.futures
from typing import List, Union
from loguru import logger
from pydantic import BaseModel
from swarms.structs.agent import Agent
from swarms.schemas.agent_step_schemas import ManySteps
class AgentRowMetadata(BaseModel):
row_index: int
agent_runs: List[ManySteps]
class AgentMatrixMetadata(BaseModel):
matrix_runs: List[AgentRowMetadata]
class AgentMatrix:
def __init__(
self, agents: Union[List["Agent"], List[List["Agent"]]]
):
"""
Initializes the matrix with the provided list of agents or list of lists of agents.
Args:
agents (List[Agent] or List[List[Agent]]): A list of agents or a list of lists of agents (matrix).
"""
if isinstance(agents[0], list):
self.agents_matrix: List[List["Agent"]] = (
agents # List of lists (matrix)
)
self.rows: int = len(agents)
self.cols: int = len(agents[0]) if agents else 0
else:
self.agents_matrix: List[List["Agent"]] = [
agents
] # Single row of agents (1D list)
self.rows: int = 1
self.cols: int = len(agents)
# Store metadata for all runs
self.matrix_metadata = AgentMatrixMetadata(matrix_runs=[])
logger.info(
f"AgentMatrix initialized with {self.rows} rows and {self.cols} columns of agents."
)
def execute_in_order(self, query: str) -> None:
"""Executes the agents in row-major order."""
logger.info(
f"Executing all agents in row-major order with query: {query}"
)
for i, row in enumerate(self.agents_matrix):
row_metadata = AgentRowMetadata(
row_index=i, agent_runs=[]
)
for j, agent in enumerate(row):
logger.info(f"Executing Agent [{i}][{j}]")
out = agent.run(query)
logger.info(f"Output from Agent [{i}][{j}]: {out}")
agent_metadata = agent.agent_output
row_metadata.agent_runs.append(agent_metadata)
self.matrix_metadata.matrix_runs.append(row_metadata)
def execute_by_row(
self, row_index: int, query: str, sequential: bool = True
) -> None:
"""
Executes all agents in a specific row, either sequentially or concurrently.
Args:
row_index (int): The index of the row to execute.
query (str): The query to run.
sequential (bool): Whether to execute agents sequentially (True) or concurrently (False).
"""
if not (0 <= row_index < self.rows):
logger.error(f"Invalid row index: {row_index}")
return
logger.info(
f"Executing row {row_index} with query: {query}. Sequential: {sequential}"
)
row_metadata = AgentRowMetadata(
row_index=row_index, agent_runs=[]
)
if sequential:
self._execute_row_sequentially(
row_index, query, row_metadata
)
else:
self._execute_row_concurrently(
row_index, query, row_metadata
)
self.matrix_metadata.matrix_runs.append(row_metadata)
def _execute_row_sequentially(
self,
row_index: int,
query: str,
row_metadata: AgentRowMetadata,
) -> None:
"""Executes agents in a row sequentially, passing output from one agent to the next."""
logger.info(
f"Executing agents in row {row_index} sequentially."
)
current_input = query
for j, agent in enumerate(self.agents_matrix[row_index]):
logger.info(
f"Executing Agent [{row_index}][{j}] sequentially with input: {current_input}"
)
current_output = agent.run(current_input)
agent_metadata = agent.agent_output
logger.info(
f"Output from Agent [{row_index}][{j}]: {current_output}"
)
row_metadata.agent_runs.append(agent_metadata)
current_input = current_output
def _execute_row_concurrently(
self,
row_index: int,
query: str,
row_metadata: AgentRowMetadata,
) -> None:
"""Executes agents in a row concurrently."""
logger.info(
f"Executing agents in row {row_index} concurrently."
)
def agent_task(agent, query):
return agent.run(query)
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_agent = {
executor.submit(agent_task, agent, query): agent
for agent in self.agents_matrix[row_index]
}
for future in concurrent.futures.as_completed(
future_to_agent
):
agent = future_to_agent[future]
try:
output = future.result()
logger.info(
f"Output from concurrent agent: {output}"
)
# Capture metadata
agent_metadata = agent.agent_output
row_metadata.agent_runs.append(agent_metadata)
except Exception as exc:
logger.error(
f"Agent generated an exception: {exc}"
)
def execute_by_column(self, col_index: int, query: str) -> None:
"""Executes all agents in a specific column."""
if not (0 <= col_index < self.cols):
logger.error(f"Invalid column index: {col_index}")
return
logger.info(
f"Executing column {col_index} with query: {query}"
)
for i in range(self.rows):
logger.info(f"Executing Agent [{i}][{col_index}]")
out = self.agents_matrix[i][col_index].run(query)
logger.info(
f"Output from Agent [{i}][{col_index}]: {out}"
)
# Capture metadata for the column run
row_metadata = AgentRowMetadata(
row_index=i, agent_runs=[]
)
agent_metadata = self.agents_matrix[i][
col_index
].agent_output
row_metadata.agent_runs.append(agent_metadata)
self.matrix_metadata.matrix_runs.append(row_metadata)
def export_metadata(self) -> str:
"""Exports the metadata to a JSON format."""
logger.info("Exporting metadata to JSON.")
return self.matrix_metadata.json(indent=4)
# Example usage with pre-created agents
# # Assuming you have pre-created agents, here's an example:
# # agent_1, agent_2, ..., agent_n are instances of the `Agent` class
# agents_row_1 = [agent_1, agent_2, agent_3]
# agents_row_2 = [agent_4, agent_5, agent_6]
# agents_row_3 = [agent_7, agent_8, agent_9]
# # Matrix of agents (list of lists)
# agents_matrix = [agents_row_1, agents_row_2, agents_row_3]
# # Initialize the AgentMatrix with the list of lists
# agent_matrix = AgentMatrix(agents_matrix)
# # Execute all agents in row 1 sequentially (output of one agent passed to the next)
# agent_matrix.execute_by_row(1, "What is the process for getting a ROTH IRA started?", sequential=True)
# # Execute all agents in row 1 concurrently (all agents run independently)
# agent_matrix.execute_by_row(1, "What is the process for getting a ROTH IRA started?", sequential=False)
# # Export and print the run metadata in JSON format
# metadata_json = agent_matrix.export_metadata()
# print(metadata_json)

@ -1,629 +0,0 @@
import os
import random
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from threading import Lock
from typing import Any, Callable, Dict, List, Optional
from loguru import logger
from pydantic import BaseModel, Field
from swarms import Agent, create_file_in_folder
from swarms.schemas.agent_step_schemas import ManySteps
class MultiAgentCollaborationSchema(BaseModel):
name: str = Field(..., title="Name of the collaboration")
description: str = Field(
..., title="Description of the collaboration"
)
agent_outputs: List[ManySteps] = Field(
..., title="List of agent outputs"
)
timestamp: str = Field(
default_factory=lambda: time.strftime("%Y-%m-%d %H:%M:%S"),
title="Timestamp of the collaboration",
)
number_of_agents: int = Field(
..., title="Number of agents in the collaboration"
)
class Cache:
def __init__(self, expiration_time: Optional[timedelta] = None):
"""
Initializes the cache.
:param expiration_time: Time after which a cache entry should expire.
"""
self.cache: Dict[str, Dict[str, Any]] = {}
self.expiration_time = expiration_time
self.lock = Lock()
def set(self, key: str, value: Any):
"""
Stores a value in the cache with an optional expiration time.
:param key: Cache key.
:param value: Value to store in the cache.
"""
with self.lock:
expiry = (
datetime.utcnow() + self.expiration_time
if self.expiration_time
else None
)
self.cache[key] = {"value": value, "expiry": expiry}
logger.debug(
f"Cache set for key '{key}' with expiry {expiry}"
)
def get(self, key: str) -> Optional[Any]:
"""
Retrieves a value from the cache.
:param key: Cache key.
:return: Cached value if available and not expired, else None.
"""
with self.lock:
if key in self.cache:
entry = self.cache[key]
if (
entry["expiry"]
and entry["expiry"] < datetime.utcnow()
):
logger.debug(f"Cache expired for key '{key}'")
del self.cache[key]
return None
logger.debug(f"Cache hit for key '{key}'")
return entry["value"]
logger.debug(f"Cache miss for key '{key}'")
return None
def random_selector(agents: List[Agent], iteration: int) -> Agent:
"""
Selects a random agent.
:param agents: List of agents to select from.
:param iteration: The current iteration number (unused).
:return: A randomly selected agent.
"""
return random.choice(agents)
def first_agent_selector(
agents: List[Agent], iteration: int
) -> Agent:
"""
Always selects the first agent in the list.
:param agents: List of agents to select from.
:param iteration: The current iteration number (unused).
:return: The first agent in the list.
"""
return agents[0]
def last_agent_selector(agents: List[Agent], iteration: int) -> Agent:
"""
Always selects the last agent in the list.
:param agents: List of agents to select from.
:param iteration: The current iteration number (unused).
:return: The last agent in the list.
"""
return agents[-1]
def reverse_round_robin_selector(
agents: List[Agent], iteration: int
) -> Agent:
"""
Selects agents in reverse round-robin order.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:return: The agent selected in reverse round-robin order.
"""
index = -((iteration % len(agents)) + 1)
return agents[index]
def even_iteration_selector(
agents: List[Agent], iteration: int
) -> Agent:
"""
Selects the agent based on even iteration; defaults to the first agent if odd.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:return: The selected agent based on even iteration.
"""
return (
agents[iteration % len(agents)]
if iteration % 2 == 0
else agents[0]
)
def odd_iteration_selector(
agents: List[Agent], iteration: int
) -> Agent:
"""
Selects the agent based on odd iteration; defaults to the last agent if even.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:return: The selected agent based on odd iteration.
"""
return (
agents[iteration % len(agents)]
if iteration % 2 != 0
else agents[-1]
)
def weighted_random_selector(
agents: List[Agent], iteration: int
) -> Agent:
"""
Selects an agent based on weighted random choice, with the first agent having a higher weight.
:param agents: List of agents to select from.
:param iteration: The current iteration number (unused).
:return: A randomly selected agent with weighted preference.
"""
weights = [1] * len(agents)
weights[0] = 2 # Give the first agent higher weight
return random.choices(agents, weights=weights, k=1)[0]
def increasing_weight_selector(
agents: List[Agent], iteration: int
) -> Agent:
"""
Selects an agent based on increasing weight with iteration (favoring later agents).
:param agents: List of agents to select from.
:param iteration: The current iteration number (unused).
:return: A randomly selected agent with increasing weight.
"""
weights = [i + 1 for i in range(len(agents))]
return random.choices(agents, weights=weights, k=1)[0]
def decreasing_weight_selector(
agents: List[Agent], iteration: int
) -> Agent:
"""
Selects an agent based on decreasing weight with iteration (favoring earlier agents).
:param agents: List of agents to select from.
:param iteration: The current iteration number (unused).
:return: A randomly selected agent with decreasing weight.
"""
weights = [len(agents) - i for i in range(len(agents))]
return random.choices(agents, weights=weights, k=1)[0]
def round_robin_with_skip_selector(
agents: List[Agent], iteration: int
) -> Agent:
"""
Selects agents in a round-robin fashion but skips every third agent.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:return: The selected agent with a skipping pattern.
"""
index = (iteration * 2) % len(agents)
return agents[index]
def priority_selector(
agents: List[Agent], iteration: int, priority_index: int = 0
) -> Agent:
"""
Selects an agent based on a priority index, always selecting the agent at the given index.
:param agents: List of agents to select from.
:param iteration: The current iteration number (unused).
:param priority_index: The index of the agent with priority.
:return: The agent at the priority index.
"""
return agents[priority_index]
def dynamic_priority_selector(
agents: List[Agent], iteration: int, priorities: List[int] = None
) -> Agent:
"""
Selects an agent based on dynamic priorities, which can change over iterations.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:param priorities: A list of priorities for each agent, determining their selection likelihood.
:return: The selected agent based on dynamic priorities.
"""
if priorities is None:
priorities = [1] * len(agents)
index = random.choices(
range(len(agents)), weights=priorities, k=1
)[0]
return agents[index]
def alternating_selector(
agents: List[Agent], iteration: int
) -> Agent:
"""
Alternates between the first and last agent.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:return: The first agent if the iteration is even, the last if odd.
"""
return agents[0] if iteration % 2 == 0 else agents[-1]
def middle_agent_selector(
agents: List[Agent], iteration: int
) -> Agent:
"""
Always selects the middle agent.
:param agents: List of agents to select from.
:param iteration: The current iteration number (unused).
:return: The middle agent in the list.
"""
index = len(agents) // 2
return agents[index]
def weighted_round_robin_selector(
agents: List[Agent], iteration: int, weights: List[int] = None
) -> Agent:
"""
Selects agents in a weighted round-robin fashion.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:param weights: A list of weights to determine the likelihood of selection.
:return: The selected agent based on weighted round-robin.
"""
if weights is None:
weights = [1] * len(agents)
index = random.choices(range(len(agents)), weights=weights, k=1)[
0
]
return agents[index]
def even_odd_priority_selector(
agents: List[Agent], iteration: int
) -> Agent:
"""
Gives priority to even-indexed agents on even iterations and odd-indexed agents on odd iterations.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:return: The selected agent based on even/odd priority.
"""
even_agents = agents[::2]
odd_agents = agents[1::2]
return (
random.choice(even_agents)
if iteration % 2 == 0
else random.choice(odd_agents)
)
def reverse_selector(agents: List[Agent], iteration: int) -> Agent:
"""
Selects agents in reverse order starting from the last agent.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:return: The agent selected in reverse order.
"""
return agents[-(iteration % len(agents)) - 1]
def frequent_first_selector(
agents: List[Agent], iteration: int, frequency: int = 3
) -> Agent:
"""
Frequently selects the first agent every 'n' iterations, otherwise selects in round-robin.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:param frequency: The frequency of selecting the first agent.
:return: The selected agent with frequent first preference.
"""
if iteration % frequency == 0:
return agents[0]
return agents[iteration % len(agents)]
def frequent_last_selector(
agents: List[Agent], iteration: int, frequency: int = 3
) -> Agent:
"""
Frequently selects the last agent every 'n' iterations, otherwise selects in round-robin.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:param frequency: The frequency of selecting the last agent.
:return: The selected agent with frequent last preference.
"""
if iteration % frequency == 0:
return agents[-1]
return agents[iteration % len(agents)]
def random_skip_selector(
agents: List[Agent], iteration: int, skip_probability: float = 0.5
) -> Agent:
"""
Randomly skips agents with a given probability, selecting the next in line.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:param skip_probability: The probability of skipping an agent.
:return: The selected agent with random skips.
"""
while random.random() < skip_probability:
iteration += 1
return agents[iteration % len(agents)]
def adaptive_selector(
agents: List[Agent],
iteration: int,
performance_metric: Callable[[Agent], float] = None,
) -> Agent:
"""
Selects the agent based on a performance metric, favoring better-performing agents.
:param agents: List of agents to select from.
:param iteration: The current iteration number.
:param performance_metric: A function to determine the performance of each agent.
:return: The selected agent based on adaptive performance.
"""
if performance_metric is None:
def performance_metric(agent):
return (
random.random()
) # Default random performance metric
performance_scores = [
performance_metric(agent) for agent in agents
]
best_agent_index = performance_scores.index(
max(performance_scores)
)
return agents[best_agent_index]
class MultiAgentCollaboration:
"""
Initializes the MultiAgentCollaboration.
:param agents: List of Agent instances.
:param speaker_fn: Function to select the agent for each loop.
:param max_loops: Maximum number of iterations.
:param use_cache: Boolean to enable or disable caching.
:param autosave_on: Boolean to enable or disable autosaving the output.
"""
def __init__(
self,
name: str = "MultiAgentCollaboration",
description: str = "A collaboration of multiple agents",
agents: List[Agent] = [],
speaker_fn: Callable[[List[Agent], int], Agent] = [],
max_loops: int = 1,
use_cache: bool = True,
autosave_on: bool = True,
):
self.name = name
self.description = description
self.agents = agents
self.speaker_fn = speaker_fn
self.max_loops = max_loops
self.autosave_on = autosave_on
self.lock = Lock()
self.max_workers = os.cpu_count()
self.use_cache = use_cache
logger.info(
f"Initialized MultiAgentCollaboration with {len(agents)} agents and max_loops={max_loops}"
)
# Cache
self.cache = Cache(expiration_time=timedelta(minutes=5))
# Output schema
self.output_schema = MultiAgentCollaborationSchema(
name=name,
description=description,
agent_outputs=[],
number_of_agents=len(agents),
)
def _execute_agent(self, agent: Agent, task: str, loop: int):
"""
Executes an agent's run method and records the output.
:param agent: The Agent instance to execute.
:param task: The input prompt for the agent.
:param loop: Current loop iteration.
"""
logger.debug(
f"Executing agent '{agent.agent_name}' on loop {loop}"
)
output = agent.run(task)
agent_output = agent.agent_output
self.output_schema.agent_outputs.append(agent_output)
return output
def run(self, task: str, *args, **kwargs):
"""
Runs the agents in sequence, passing the output of one as the input to the next.
:param task: The input prompt to pass to each agent.
:return: The final output of the last agent.
"""
logger.info("Starting MultiAgentCollaboration run.")
current_task = task
with ThreadPoolExecutor(
max_workers=self.max_workers
) as executor:
for i in range(self.max_loops):
selected_agent = self.speaker_fn(self.agents, i)
logger.debug(
f"Loop {i}: Selected agent '{selected_agent.agent_name}'"
)
future = executor.submit(
self._execute_agent,
selected_agent,
current_task,
i,
*args,
**kwargs,
)
try:
current_task = (
future.result()
) # The output of this agent becomes the input for the next
except Exception as exc:
logger.error(
f"Loop {i} generated an exception: {exc}"
)
break
logger.info("Completed MultiAgentCollaboration run.")
if self.autosave_on is True:
self.save_file()
return self.return_output_schema_json()
def save_file(self):
time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
create_file_in_folder(
"multi_agent_collab_folder",
f"{self.name}_time_{time}_output.json",
self.return_output_schema_json(),
)
def get_outputs_dict(self):
"""
Retrieves all recorded agent outputs as a list of dictionaries.
:return: List of dictionaries representing AgentOutput instances.
"""
return self.output_schema.model_dump()
def return_output_schema_json(self):
return self.output_schema.model_dump_json(indent=4)
def round_robin_speaker(agents: List[Agent], iteration: int) -> Agent:
"""
Selects an agent from the given list of agents using round-robin scheduling.
Args:
agents (List[Agent]): The list of agents to select from.
iteration (int): The current iteration number.
Returns:
Agent: The selected agent.
"""
selected = agents[iteration % len(agents)]
logger.debug(
f"Round-robin selected agent '{selected.agent_name}' for iteration {iteration}"
)
return selected
# # Example usage
# if __name__ == "__main__":
# from swarms import OpenAIChat
# from swarms.prompts.finance_agent_sys_prompt import (
# FINANCIAL_AGENT_SYS_PROMPT,
# )
# # Get the OpenAI API key from the environment variable
# api_key = os.getenv("OPENAI_API_KEY")
# if not api_key:
# logger.error("OpenAI API key not found in environment variables.")
# exit(1)
# # Create instances of the OpenAIChat class
# model = OpenAIChat(
# api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
# )
# # Initialize agents
# agent1 = Agent(
# agent_name="Financial-Analysis-Agent_1",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# llm=model,
# max_loops=1,
# dynamic_temperature_enabled=True,
# saved_state_path="finance_agent_1.json",
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# return_step_meta=False,
# )
# agent2 = Agent(
# agent_name="Financial-Analysis-Agent_2",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# llm=model,
# max_loops=1,
# dynamic_temperature_enabled=True,
# saved_state_path="finance_agent_2.json",
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# return_step_meta=False,
# )
# agent2 = Agent(
# agent_name="Financial-Analysis-Agent_3",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# llm=model,
# max_loops=1,
# dynamic_temperature_enabled=True,
# saved_state_path="finance_agent_2.json",
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# return_step_meta=False,
# )
# # Initialize the MultiAgentCollaboration with the round-robin speaker function
# multi_agent_framework = MultiAgentCollaboration(
# agents=[agent1, agent2],
# speaker_fn=round_robin_speaker,
# max_loops=3,
# use_cache=True, # Enable caching
# autosave_on=True,
# )
# # Run the framework with an input prompt
# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria"
# out = multi_agent_framework.run(task)
# print(out)
# print(multi_agent_framework.return_output_schema_json())

@ -1,242 +0,0 @@
import os
import sys
import datetime
from typing import List, Dict, Any, Optional
from swarms import Agent
from swarm_models import OpenAIChat
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from pulsar import Client, Producer
from pydantic import BaseModel, Field
from loguru import logger
# Configure Loguru logger
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add("swarm_logs.log", rotation="10 MB", level="DEBUG")
# Apache Pulsar configuration
PULSAR_SERVICE_URL = os.getenv(
"PULSAR_SERVICE_URL", "pulsar://localhost:6650"
)
# Define Pydantic schemas for structured output
class AgentOutputMetadata(BaseModel):
agent_name: str
task: str
timestamp: datetime.datetime
status: str
class AgentOutputData(BaseModel):
output: str
additional_info: Optional[Dict[str, Any]] = None
class AgentOutputSchema(BaseModel):
metadata: AgentOutputMetadata
data: AgentOutputData
class SwarmOutputSchema(BaseModel):
results: List[AgentOutputSchema] = Field(default_factory=list)
# SwarmManager class to manage agents and tasks
class SwarmManager:
def __init__(
self,
agents: List[Agent],
pulsar_service_url: str = PULSAR_SERVICE_URL,
):
"""
Initializes the SwarmManager with a list of agents and Pulsar service URL.
:param agents: List of Agent instances.
:param pulsar_service_url: URL of the Apache Pulsar service.
"""
self.agents = agents
self.pulsar_service_url = pulsar_service_url
self.client: Optional[Client] = None
self.producers: Dict[str, Producer] = {}
self.swarm_results = SwarmOutputSchema()
def connect_pulsar(self) -> None:
"""
Establishes connection to the Apache Pulsar service.
"""
try:
self.client = Client(
self.pulsar_service_url, operation_timeout_seconds=30
)
logger.info(
f"Connected to Pulsar service at {self.pulsar_service_url}"
)
except Exception as e:
logger.error(f"Failed to connect to Pulsar service: {e}")
raise
def initialize_producers(self) -> None:
"""
Initializes Pulsar producers for each agent.
"""
if not self.client:
logger.error("Pulsar client is not connected.")
raise ConnectionError("Pulsar client is not connected.")
for agent in self.agents:
try:
topic = f"{agent.agent_name}_topic"
producer = self.client.create_producer(topic)
self.producers[agent.agent_name] = producer
logger.debug(
f"Initialized producer for agent {agent.agent_name} on topic {topic}"
)
except Exception as e:
logger.error(
f"Failed to create producer for agent {agent.agent_name}: {e}"
)
raise
def run_task(self, agent: Agent, task: str) -> AgentOutputSchema:
"""
Executes a task using the specified agent and returns the structured output.
:param agent: The Agent instance to execute the task.
:param task: The task string to be executed.
:return: AgentOutputSchema containing the result and metadata.
"""
logger.info(
f"Agent {agent.agent_name} is starting task: {task}"
)
timestamp = datetime.datetime.utcnow()
try:
output = agent.run(task)
status = "Success"
logger.info(
f"Agent {agent.agent_name} completed task successfully."
)
except Exception as e:
output = str(e)
status = "Failed"
logger.error(
f"Agent {agent.agent_name} failed to complete task: {e}"
)
metadata = AgentOutputMetadata(
agent_name=agent.agent_name,
task=task,
timestamp=timestamp,
status=status,
)
data = AgentOutputData(output=output)
agent_output = AgentOutputSchema(metadata=metadata, data=data)
# Publish result to Pulsar topic
try:
producer = self.producers.get(agent.agent_name)
if producer:
producer.send(agent_output.json().encode("utf-8"))
logger.debug(
f"Published output for agent {agent.agent_name} to Pulsar topic."
)
else:
logger.warning(
f"No producer found for agent {agent.agent_name}. Skipping publish step."
)
except Exception as e:
logger.error(
f"Failed to publish output for agent {agent.agent_name}: {e}"
)
return agent_output
def run(self, task: str) -> SwarmOutputSchema:
"""
Runs the swarm by executing the task across all agents sequentially and returns aggregated results.
:param task: The task string to be executed by the swarm.
:return: SwarmOutputSchema containing results from all agents.
"""
try:
self.connect_pulsar()
self.initialize_producers()
for agent in self.agents:
result = self.run_task(agent, task)
self.swarm_results.results.append(result)
logger.info("Swarm run completed successfully.")
return self.swarm_results
except Exception as e:
logger.error(f"Swarm run encountered an error: {e}")
raise
finally:
if self.client:
self.client.close()
logger.info("Pulsar client connection closed.")
# Example usage
if __name__ == "__main__":
# Initialize OpenAIChat model
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
logger.error(
"OPENAI_API_KEY environment variable is not set."
)
sys.exit(1)
model = OpenAIChat(
api_key=api_key, model_name="gpt-4", temperature=0.1
)
# Define agents
agent1 = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
llm=model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="finance_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=2000,
return_step_meta=False,
)
agent2 = Agent(
agent_name="Market-Analysis-Agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
llm=model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="market_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=2000,
return_step_meta=False,
)
# Initialize and run swarm
swarm = SwarmManager(agents=[agent1, agent2])
task_description = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?"
results = swarm.run(task_description)
# Output results
print(results.json(indent=4))
Loading…
Cancel
Save