parent
b1834a62f8
commit
b53cd15c24
@ -1,276 +0,0 @@
|
||||
import asyncio
|
||||
import pulsar
|
||||
|
||||
from pulsar import ConsumerType
|
||||
from loguru import logger
|
||||
from swarms import Agent
|
||||
from typing import List, Dict, Any
|
||||
import json
|
||||
|
||||
|
||||
class ScalableAsyncAgentSwarm:
|
||||
"""
|
||||
A scalable, asynchronous swarm of agents leveraging Apache Pulsar for inter-agent communication.
|
||||
Provides load balancing, health monitoring, dead letter queues, and centralized logging.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pulsar_url: str,
|
||||
topic: str,
|
||||
dlq_topic: str,
|
||||
agents_config: List[Dict[str, Any]],
|
||||
):
|
||||
"""
|
||||
Initializes the async swarm with agents.
|
||||
|
||||
Args:
|
||||
pulsar_url (str): The URL of the Apache Pulsar broker.
|
||||
topic (str): The main topic for task distribution.
|
||||
dlq_topic (str): The Dead Letter Queue topic for failed messages.
|
||||
agents_config (List[Dict[str, Any]]): List of agent configurations with `name`, `description`, and `model_name`.
|
||||
"""
|
||||
self.pulsar_url = pulsar_url
|
||||
self.topic = topic
|
||||
self.dlq_topic = dlq_topic
|
||||
self.agents_config = agents_config
|
||||
self.client = pulsar.Client(pulsar_url)
|
||||
self.consumer = self.client.subscribe(
|
||||
topic,
|
||||
subscription_name="swarm-task-sub",
|
||||
consumer_type=ConsumerType.Shared,
|
||||
)
|
||||
self.dlq_producer = self.client.create_producer(dlq_topic)
|
||||
self.response_logger = []
|
||||
self.agents = [
|
||||
self.create_agent(config) for config in agents_config
|
||||
]
|
||||
self.agent_index = 0
|
||||
|
||||
logger.info(
|
||||
"Swarm initialized with agents: {}",
|
||||
[agent["name"] for agent in agents_config],
|
||||
)
|
||||
|
||||
def create_agent(
|
||||
self, agent_config: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Creates a new agent configuration with asynchronous capabilities.
|
||||
|
||||
Args:
|
||||
agent_config (Dict[str, Any]): Configuration dictionary with agent details.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: A dictionary containing agent metadata and functionality.
|
||||
"""
|
||||
agent_name = agent_config["name"]
|
||||
description = agent_config["description"]
|
||||
model_name = agent_config.get("model_name", "gpt-4o-mini")
|
||||
|
||||
class AsyncAgent:
|
||||
"""
|
||||
An asynchronous agent that processes tasks and communicates via Apache Pulsar.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, name: str, description: str, model_name: str
|
||||
):
|
||||
self.name = name
|
||||
self.description = description
|
||||
self.agent = Agent(
|
||||
agent_name=name,
|
||||
model_name=model_name,
|
||||
max_loops="auto",
|
||||
interactive=True,
|
||||
streaming_on=True,
|
||||
)
|
||||
logger.info(
|
||||
f"Initialized agent '{name}' - {description}"
|
||||
)
|
||||
|
||||
async def process_task(
|
||||
self, message: str
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Processes a single task using the agent.
|
||||
|
||||
Args:
|
||||
message (str): The task message.
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: JSON-formatted response.
|
||||
"""
|
||||
try:
|
||||
logger.info(
|
||||
f"Agent {self.name} processing task: {message}"
|
||||
)
|
||||
response = await asyncio.to_thread(
|
||||
self.agent.run, message
|
||||
)
|
||||
logger.info(f"Agent {self.name} completed task.")
|
||||
return {
|
||||
"agent_name": self.name,
|
||||
"response": response,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Agent {self.name} encountered an error: {e}"
|
||||
)
|
||||
return {"agent_name": self.name, "error": str(e)}
|
||||
|
||||
return {
|
||||
"name": agent_name,
|
||||
"instance": AsyncAgent(
|
||||
agent_name, description, model_name
|
||||
),
|
||||
}
|
||||
|
||||
async def distribute_task(self, message: str):
|
||||
"""
|
||||
Distributes a task to the next available agent using round-robin.
|
||||
|
||||
Args:
|
||||
message (str): The task message.
|
||||
"""
|
||||
agent = self.agents[self.agent_index]
|
||||
self.agent_index = (self.agent_index + 1) % len(self.agents)
|
||||
|
||||
try:
|
||||
response = await agent["instance"].process_task(message)
|
||||
self.log_response(response)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error processing task by agent {agent['name']}: {e}"
|
||||
)
|
||||
self.send_to_dlq(message)
|
||||
|
||||
async def monitor_health(self):
|
||||
"""
|
||||
Periodically monitors the health of agents.
|
||||
"""
|
||||
while True:
|
||||
logger.info("Performing health check for all agents.")
|
||||
for agent in self.agents:
|
||||
logger.info(f"Agent {agent['name']} is online.")
|
||||
await asyncio.sleep(10)
|
||||
|
||||
def send_to_dlq(self, message: str):
|
||||
"""
|
||||
Sends a failed message to the Dead Letter Queue (DLQ).
|
||||
|
||||
Args:
|
||||
message (str): The message to send to the DLQ.
|
||||
"""
|
||||
try:
|
||||
self.dlq_producer.send(message.encode("utf-8"))
|
||||
logger.info("Message sent to Dead Letter Queue.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send message to DLQ: {e}")
|
||||
|
||||
def log_response(self, response: Dict[str, Any]):
|
||||
"""
|
||||
Logs the response to a centralized list for later analysis.
|
||||
|
||||
Args:
|
||||
response (Dict[str, Any]): The agent's response.
|
||||
"""
|
||||
self.response_logger.append(response)
|
||||
logger.info(f"Response logged: {response}")
|
||||
|
||||
async def listen_and_distribute(self):
|
||||
"""
|
||||
Listens to the main Pulsar topic and distributes tasks to agents.
|
||||
"""
|
||||
while True:
|
||||
msg = self.consumer.receive()
|
||||
try:
|
||||
message = msg.data().decode("utf-8")
|
||||
logger.info(f"Received task: {message}")
|
||||
await self.distribute_task(message)
|
||||
self.consumer.acknowledge(msg)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing message: {e}")
|
||||
self.send_to_dlq(msg.data().decode("utf-8"))
|
||||
self.consumer.negative_acknowledge(msg)
|
||||
|
||||
async def run(self):
|
||||
"""
|
||||
Runs the swarm asynchronously with health monitoring and task distribution.
|
||||
"""
|
||||
logger.info("Starting the async swarm...")
|
||||
task_listener = asyncio.create_task(
|
||||
self.listen_and_distribute()
|
||||
)
|
||||
health_monitor = asyncio.create_task(self.monitor_health())
|
||||
await asyncio.gather(task_listener, health_monitor)
|
||||
|
||||
def shutdown(self):
|
||||
"""
|
||||
Safely shuts down the swarm and logs all responses.
|
||||
"""
|
||||
logger.info("Shutting down the swarm...")
|
||||
self.client.close()
|
||||
with open("responses.json", "w") as f:
|
||||
json.dump(self.response_logger, f, indent=4)
|
||||
logger.info("Responses saved to 'responses.json'.")
|
||||
|
||||
|
||||
# from scalable_agent_swarm import ScalableAsyncAgentSwarm # Assuming your swarm class is saved here
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Example Configuration
|
||||
PULSAR_URL = "pulsar://localhost:6650"
|
||||
TOPIC = "stock-analysis"
|
||||
DLQ_TOPIC = "stock-analysis-dlq"
|
||||
|
||||
# Agents configuration
|
||||
AGENTS_CONFIG = [
|
||||
{
|
||||
"name": "Stock-Analysis-Agent-1",
|
||||
"description": "Analyzes stock trends.",
|
||||
"model_name": "gpt-4o-mini",
|
||||
},
|
||||
{
|
||||
"name": "Stock-News-Agent",
|
||||
"description": "Summarizes stock news.",
|
||||
"model_name": "gpt-4o-mini",
|
||||
},
|
||||
{
|
||||
"name": "Tech-Trends-Agent",
|
||||
"description": "Tracks tech sector trends.",
|
||||
"model_name": "gpt-4o-mini",
|
||||
},
|
||||
]
|
||||
|
||||
# Tasks to send
|
||||
TASKS = [
|
||||
"Analyze the trend for tech stocks in Q4 2024",
|
||||
"Summarize the latest news on the S&P 500",
|
||||
"Identify the top-performing sectors in the stock market",
|
||||
"Provide a forecast for AI-related stocks for 2025",
|
||||
]
|
||||
|
||||
# Initialize and run the swarm
|
||||
swarm = ScalableAsyncAgentSwarm(
|
||||
PULSAR_URL, TOPIC, DLQ_TOPIC, AGENTS_CONFIG
|
||||
)
|
||||
try:
|
||||
# Run the swarm in the background
|
||||
swarm_task = asyncio.create_task(swarm.run())
|
||||
|
||||
# Send tasks to the topic
|
||||
client = pulsar.Client(PULSAR_URL)
|
||||
producer = client.create_producer(TOPIC)
|
||||
|
||||
for task in TASKS:
|
||||
producer.send(task.encode("utf-8"))
|
||||
print(f"Sent task: {task}")
|
||||
|
||||
producer.close()
|
||||
client.close()
|
||||
|
||||
# Keep the swarm running
|
||||
asyncio.run(swarm_task)
|
||||
except KeyboardInterrupt:
|
||||
swarm.shutdown()
|
Loading…
Reference in new issue