You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
277 lines
8.9 KiB
277 lines
8.9 KiB
import asyncio
|
|
import pulsar
|
|
|
|
from pulsar import ConsumerType
|
|
from loguru import logger
|
|
from swarms import Agent
|
|
from typing import List, Dict, Any
|
|
import json
|
|
|
|
|
|
class ScalableAsyncAgentSwarm:
|
|
"""
|
|
A scalable, asynchronous swarm of agents leveraging Apache Pulsar for inter-agent communication.
|
|
Provides load balancing, health monitoring, dead letter queues, and centralized logging.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
pulsar_url: str,
|
|
topic: str,
|
|
dlq_topic: str,
|
|
agents_config: List[Dict[str, Any]],
|
|
):
|
|
"""
|
|
Initializes the async swarm with agents.
|
|
|
|
Args:
|
|
pulsar_url (str): The URL of the Apache Pulsar broker.
|
|
topic (str): The main topic for task distribution.
|
|
dlq_topic (str): The Dead Letter Queue topic for failed messages.
|
|
agents_config (List[Dict[str, Any]]): List of agent configurations with `name`, `description`, and `model_name`.
|
|
"""
|
|
self.pulsar_url = pulsar_url
|
|
self.topic = topic
|
|
self.dlq_topic = dlq_topic
|
|
self.agents_config = agents_config
|
|
self.client = pulsar.Client(pulsar_url)
|
|
self.consumer = self.client.subscribe(
|
|
topic,
|
|
subscription_name="swarm-task-sub",
|
|
consumer_type=ConsumerType.Shared,
|
|
)
|
|
self.dlq_producer = self.client.create_producer(dlq_topic)
|
|
self.response_logger = []
|
|
self.agents = [
|
|
self.create_agent(config) for config in agents_config
|
|
]
|
|
self.agent_index = 0
|
|
|
|
logger.info(
|
|
"Swarm initialized with agents: {}",
|
|
[agent["name"] for agent in agents_config],
|
|
)
|
|
|
|
def create_agent(
|
|
self, agent_config: Dict[str, Any]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Creates a new agent configuration with asynchronous capabilities.
|
|
|
|
Args:
|
|
agent_config (Dict[str, Any]): Configuration dictionary with agent details.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing agent metadata and functionality.
|
|
"""
|
|
agent_name = agent_config["name"]
|
|
description = agent_config["description"]
|
|
model_name = agent_config.get("model_name", "gpt-4o-mini")
|
|
|
|
class AsyncAgent:
|
|
"""
|
|
An asynchronous agent that processes tasks and communicates via Apache Pulsar.
|
|
"""
|
|
|
|
def __init__(
|
|
self, name: str, description: str, model_name: str
|
|
):
|
|
self.name = name
|
|
self.description = description
|
|
self.agent = Agent(
|
|
agent_name=name,
|
|
model_name=model_name,
|
|
max_loops="auto",
|
|
interactive=True,
|
|
streaming_on=True,
|
|
)
|
|
logger.info(
|
|
f"Initialized agent '{name}' - {description}"
|
|
)
|
|
|
|
async def process_task(
|
|
self, message: str
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Processes a single task using the agent.
|
|
|
|
Args:
|
|
message (str): The task message.
|
|
|
|
Returns:
|
|
Dict[str, Any]: JSON-formatted response.
|
|
"""
|
|
try:
|
|
logger.info(
|
|
f"Agent {self.name} processing task: {message}"
|
|
)
|
|
response = await asyncio.to_thread(
|
|
self.agent.run, message
|
|
)
|
|
logger.info(f"Agent {self.name} completed task.")
|
|
return {
|
|
"agent_name": self.name,
|
|
"response": response,
|
|
}
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Agent {self.name} encountered an error: {e}"
|
|
)
|
|
return {"agent_name": self.name, "error": str(e)}
|
|
|
|
return {
|
|
"name": agent_name,
|
|
"instance": AsyncAgent(
|
|
agent_name, description, model_name
|
|
),
|
|
}
|
|
|
|
async def distribute_task(self, message: str):
|
|
"""
|
|
Distributes a task to the next available agent using round-robin.
|
|
|
|
Args:
|
|
message (str): The task message.
|
|
"""
|
|
agent = self.agents[self.agent_index]
|
|
self.agent_index = (self.agent_index + 1) % len(self.agents)
|
|
|
|
try:
|
|
response = await agent["instance"].process_task(message)
|
|
self.log_response(response)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error processing task by agent {agent['name']}: {e}"
|
|
)
|
|
self.send_to_dlq(message)
|
|
|
|
async def monitor_health(self):
|
|
"""
|
|
Periodically monitors the health of agents.
|
|
"""
|
|
while True:
|
|
logger.info("Performing health check for all agents.")
|
|
for agent in self.agents:
|
|
logger.info(f"Agent {agent['name']} is online.")
|
|
await asyncio.sleep(10)
|
|
|
|
def send_to_dlq(self, message: str):
|
|
"""
|
|
Sends a failed message to the Dead Letter Queue (DLQ).
|
|
|
|
Args:
|
|
message (str): The message to send to the DLQ.
|
|
"""
|
|
try:
|
|
self.dlq_producer.send(message.encode("utf-8"))
|
|
logger.info("Message sent to Dead Letter Queue.")
|
|
except Exception as e:
|
|
logger.error(f"Failed to send message to DLQ: {e}")
|
|
|
|
def log_response(self, response: Dict[str, Any]):
|
|
"""
|
|
Logs the response to a centralized list for later analysis.
|
|
|
|
Args:
|
|
response (Dict[str, Any]): The agent's response.
|
|
"""
|
|
self.response_logger.append(response)
|
|
logger.info(f"Response logged: {response}")
|
|
|
|
async def listen_and_distribute(self):
|
|
"""
|
|
Listens to the main Pulsar topic and distributes tasks to agents.
|
|
"""
|
|
while True:
|
|
msg = self.consumer.receive()
|
|
try:
|
|
message = msg.data().decode("utf-8")
|
|
logger.info(f"Received task: {message}")
|
|
await self.distribute_task(message)
|
|
self.consumer.acknowledge(msg)
|
|
except Exception as e:
|
|
logger.error(f"Error processing message: {e}")
|
|
self.send_to_dlq(msg.data().decode("utf-8"))
|
|
self.consumer.negative_acknowledge(msg)
|
|
|
|
async def run(self):
|
|
"""
|
|
Runs the swarm asynchronously with health monitoring and task distribution.
|
|
"""
|
|
logger.info("Starting the async swarm...")
|
|
task_listener = asyncio.create_task(
|
|
self.listen_and_distribute()
|
|
)
|
|
health_monitor = asyncio.create_task(self.monitor_health())
|
|
await asyncio.gather(task_listener, health_monitor)
|
|
|
|
def shutdown(self):
|
|
"""
|
|
Safely shuts down the swarm and logs all responses.
|
|
"""
|
|
logger.info("Shutting down the swarm...")
|
|
self.client.close()
|
|
with open("responses.json", "w") as f:
|
|
json.dump(self.response_logger, f, indent=4)
|
|
logger.info("Responses saved to 'responses.json'.")
|
|
|
|
|
|
# from scalable_agent_swarm import ScalableAsyncAgentSwarm # Assuming your swarm class is saved here
|
|
|
|
if __name__ == "__main__":
|
|
# Example Configuration
|
|
PULSAR_URL = "pulsar://localhost:6650"
|
|
TOPIC = "stock-analysis"
|
|
DLQ_TOPIC = "stock-analysis-dlq"
|
|
|
|
# Agents configuration
|
|
AGENTS_CONFIG = [
|
|
{
|
|
"name": "Stock-Analysis-Agent-1",
|
|
"description": "Analyzes stock trends.",
|
|
"model_name": "gpt-4o-mini",
|
|
},
|
|
{
|
|
"name": "Stock-News-Agent",
|
|
"description": "Summarizes stock news.",
|
|
"model_name": "gpt-4o-mini",
|
|
},
|
|
{
|
|
"name": "Tech-Trends-Agent",
|
|
"description": "Tracks tech sector trends.",
|
|
"model_name": "gpt-4o-mini",
|
|
},
|
|
]
|
|
|
|
# Tasks to send
|
|
TASKS = [
|
|
"Analyze the trend for tech stocks in Q4 2024",
|
|
"Summarize the latest news on the S&P 500",
|
|
"Identify the top-performing sectors in the stock market",
|
|
"Provide a forecast for AI-related stocks for 2025",
|
|
]
|
|
|
|
# Initialize and run the swarm
|
|
swarm = ScalableAsyncAgentSwarm(
|
|
PULSAR_URL, TOPIC, DLQ_TOPIC, AGENTS_CONFIG
|
|
)
|
|
try:
|
|
# Run the swarm in the background
|
|
swarm_task = asyncio.create_task(swarm.run())
|
|
|
|
# Send tasks to the topic
|
|
client = pulsar.Client(PULSAR_URL)
|
|
producer = client.create_producer(TOPIC)
|
|
|
|
for task in TASKS:
|
|
producer.send(task.encode("utf-8"))
|
|
print(f"Sent task: {task}")
|
|
|
|
producer.close()
|
|
client.close()
|
|
|
|
# Keep the swarm running
|
|
asyncio.run(swarm_task)
|
|
except KeyboardInterrupt:
|
|
swarm.shutdown()
|