You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/pulsar_swarm.py

277 lines
8.9 KiB

1 month ago
import asyncio
import pulsar
from pulsar import ConsumerType
from loguru import logger
from swarms import Agent
from typing import List, Dict, Any
import json
class ScalableAsyncAgentSwarm:
"""
A scalable, asynchronous swarm of agents leveraging Apache Pulsar for inter-agent communication.
Provides load balancing, health monitoring, dead letter queues, and centralized logging.
"""
def __init__(
self,
pulsar_url: str,
topic: str,
dlq_topic: str,
agents_config: List[Dict[str, Any]],
):
"""
Initializes the async swarm with agents.
Args:
pulsar_url (str): The URL of the Apache Pulsar broker.
topic (str): The main topic for task distribution.
dlq_topic (str): The Dead Letter Queue topic for failed messages.
agents_config (List[Dict[str, Any]]): List of agent configurations with `name`, `description`, and `model_name`.
"""
self.pulsar_url = pulsar_url
self.topic = topic
self.dlq_topic = dlq_topic
self.agents_config = agents_config
self.client = pulsar.Client(pulsar_url)
self.consumer = self.client.subscribe(
topic,
subscription_name="swarm-task-sub",
consumer_type=ConsumerType.Shared,
)
self.dlq_producer = self.client.create_producer(dlq_topic)
self.response_logger = []
self.agents = [
self.create_agent(config) for config in agents_config
]
self.agent_index = 0
logger.info(
"Swarm initialized with agents: {}",
[agent["name"] for agent in agents_config],
)
def create_agent(
self, agent_config: Dict[str, Any]
) -> Dict[str, Any]:
"""
Creates a new agent configuration with asynchronous capabilities.
Args:
agent_config (Dict[str, Any]): Configuration dictionary with agent details.
Returns:
Dict[str, Any]: A dictionary containing agent metadata and functionality.
"""
agent_name = agent_config["name"]
description = agent_config["description"]
model_name = agent_config.get("model_name", "gpt-4o-mini")
class AsyncAgent:
"""
An asynchronous agent that processes tasks and communicates via Apache Pulsar.
"""
def __init__(
self, name: str, description: str, model_name: str
):
self.name = name
self.description = description
self.agent = Agent(
agent_name=name,
model_name=model_name,
max_loops="auto",
interactive=True,
streaming_on=True,
)
logger.info(
f"Initialized agent '{name}' - {description}"
)
async def process_task(
self, message: str
) -> Dict[str, Any]:
"""
Processes a single task using the agent.
Args:
message (str): The task message.
Returns:
Dict[str, Any]: JSON-formatted response.
"""
try:
logger.info(
f"Agent {self.name} processing task: {message}"
)
response = await asyncio.to_thread(
self.agent.run, message
)
logger.info(f"Agent {self.name} completed task.")
return {
"agent_name": self.name,
"response": response,
}
except Exception as e:
logger.error(
f"Agent {self.name} encountered an error: {e}"
)
return {"agent_name": self.name, "error": str(e)}
return {
"name": agent_name,
"instance": AsyncAgent(
agent_name, description, model_name
),
}
async def distribute_task(self, message: str):
"""
Distributes a task to the next available agent using round-robin.
Args:
message (str): The task message.
"""
agent = self.agents[self.agent_index]
self.agent_index = (self.agent_index + 1) % len(self.agents)
try:
response = await agent["instance"].process_task(message)
self.log_response(response)
except Exception as e:
logger.error(
f"Error processing task by agent {agent['name']}: {e}"
)
self.send_to_dlq(message)
async def monitor_health(self):
"""
Periodically monitors the health of agents.
"""
while True:
logger.info("Performing health check for all agents.")
for agent in self.agents:
logger.info(f"Agent {agent['name']} is online.")
await asyncio.sleep(10)
def send_to_dlq(self, message: str):
"""
Sends a failed message to the Dead Letter Queue (DLQ).
Args:
message (str): The message to send to the DLQ.
"""
try:
self.dlq_producer.send(message.encode("utf-8"))
logger.info("Message sent to Dead Letter Queue.")
except Exception as e:
logger.error(f"Failed to send message to DLQ: {e}")
def log_response(self, response: Dict[str, Any]):
"""
Logs the response to a centralized list for later analysis.
Args:
response (Dict[str, Any]): The agent's response.
"""
self.response_logger.append(response)
logger.info(f"Response logged: {response}")
async def listen_and_distribute(self):
"""
Listens to the main Pulsar topic and distributes tasks to agents.
"""
while True:
msg = self.consumer.receive()
try:
message = msg.data().decode("utf-8")
logger.info(f"Received task: {message}")
await self.distribute_task(message)
self.consumer.acknowledge(msg)
except Exception as e:
logger.error(f"Error processing message: {e}")
self.send_to_dlq(msg.data().decode("utf-8"))
self.consumer.negative_acknowledge(msg)
async def run(self):
"""
Runs the swarm asynchronously with health monitoring and task distribution.
"""
logger.info("Starting the async swarm...")
task_listener = asyncio.create_task(
self.listen_and_distribute()
)
health_monitor = asyncio.create_task(self.monitor_health())
await asyncio.gather(task_listener, health_monitor)
def shutdown(self):
"""
Safely shuts down the swarm and logs all responses.
"""
logger.info("Shutting down the swarm...")
self.client.close()
with open("responses.json", "w") as f:
json.dump(self.response_logger, f, indent=4)
logger.info("Responses saved to 'responses.json'.")
# from scalable_agent_swarm import ScalableAsyncAgentSwarm # Assuming your swarm class is saved here
if __name__ == "__main__":
# Example Configuration
PULSAR_URL = "pulsar://localhost:6650"
TOPIC = "stock-analysis"
DLQ_TOPIC = "stock-analysis-dlq"
# Agents configuration
AGENTS_CONFIG = [
{
"name": "Stock-Analysis-Agent-1",
"description": "Analyzes stock trends.",
"model_name": "gpt-4o-mini",
},
{
"name": "Stock-News-Agent",
"description": "Summarizes stock news.",
"model_name": "gpt-4o-mini",
},
{
"name": "Tech-Trends-Agent",
"description": "Tracks tech sector trends.",
"model_name": "gpt-4o-mini",
},
]
# Tasks to send
TASKS = [
"Analyze the trend for tech stocks in Q4 2024",
"Summarize the latest news on the S&P 500",
"Identify the top-performing sectors in the stock market",
"Provide a forecast for AI-related stocks for 2025",
]
# Initialize and run the swarm
swarm = ScalableAsyncAgentSwarm(
PULSAR_URL, TOPIC, DLQ_TOPIC, AGENTS_CONFIG
)
try:
# Run the swarm in the background
swarm_task = asyncio.create_task(swarm.run())
# Send tasks to the topic
client = pulsar.Client(PULSAR_URL)
producer = client.create_producer(TOPIC)
for task in TASKS:
producer.send(task.encode("utf-8"))
print(f"Sent task: {task}")
producer.close()
client.close()
# Keep the swarm running
asyncio.run(swarm_task)
except KeyboardInterrupt:
swarm.shutdown()