Update pulsar_swarm.py

pull/591/head
kirill670 3 months ago committed by GitHub
parent edc293cb6f
commit 998a37d783
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -12,6 +12,7 @@ from swarms.prompts.finance_agent_sys_prompt import (
from pulsar import Client, Producer from pulsar import Client, Producer
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from loguru import logger from loguru import logger
import json
# Configure Loguru logger # Configure Loguru logger
logger.remove() logger.remove()
@ -52,191 +53,110 @@ class SwarmManager:
self, self,
agents: List[Agent], agents: List[Agent],
pulsar_service_url: str = PULSAR_SERVICE_URL, pulsar_service_url: str = PULSAR_SERVICE_URL,
topic_prefix: str = "swarm_topic_", # Prefix for Pulsar topics
): ):
"""
Initializes the SwarmManager with a list of agents and Pulsar service URL.
:param agents: List of Agent instances.
:param pulsar_service_url: URL of the Apache Pulsar service.
"""
self.agents = agents self.agents = agents
self.pulsar_service_url = pulsar_service_url self.pulsar_service_url = pulsar_service_url
self.topic_prefix = topic_prefix
self.client: Optional[Client] = None self.client: Optional[Client] = None
self.producers: Dict[str, Producer] = {} self.producers: Dict[str, Producer] = {}
self.swarm_results = SwarmOutputSchema() self.swarm_results = SwarmOutputSchema()
def connect_pulsar(self) -> None: def connect_pulsar(self) -> None:
"""
Establishes connection to the Apache Pulsar service.
"""
try: try:
self.client = Client( self.client = Client(self.pulsar_service_url)
self.pulsar_service_url, operation_timeout_seconds=30 logger.info(f"Connected to Pulsar service at {self.pulsar_service_url}")
)
logger.info(
f"Connected to Pulsar service at {self.pulsar_service_url}"
)
except Exception as e: except Exception as e:
logger.error(f"Failed to connect to Pulsar service: {e}") logger.error(f"Failed to connect to Pulsar service: {e}")
raise raise
def initialize_producers(self) -> None: def initialize_producers(self) -> None:
"""
Initializes Pulsar producers for each agent.
"""
if not self.client: if not self.client:
logger.error("Pulsar client is not connected.")
raise ConnectionError("Pulsar client is not connected.") raise ConnectionError("Pulsar client is not connected.")
for agent in self.agents: for agent in self.agents:
topic = f"{self.topic_prefix}{agent.agent_name}"
try: try:
topic = f"{agent.agent_name}_topic"
producer = self.client.create_producer(topic) producer = self.client.create_producer(topic)
self.producers[agent.agent_name] = producer self.producers[agent.agent_name] = producer
logger.debug( logger.debug(f"Initialized producer for agent {agent.agent_name} on topic {topic}")
f"Initialized producer for agent {agent.agent_name} on topic {topic}"
)
except Exception as e: except Exception as e:
logger.error( logger.error(f"Failed to create producer for agent {agent.agent_name}: {e}")
f"Failed to create producer for agent {agent.agent_name}: {e}"
)
raise raise
def run_task(self, agent: Agent, task: str) -> AgentOutputSchema: def run_task(self, agent: Agent, task: str) -> AgentOutputSchema:
""" logger.info(f"Agent {agent.agent_name} is starting task: {task}")
Executes a task using the specified agent and returns the structured output.
:param agent: The Agent instance to execute the task.
:param task: The task string to be executed.
:return: AgentOutputSchema containing the result and metadata.
"""
logger.info(
f"Agent {agent.agent_name} is starting task: {task}"
)
timestamp = datetime.datetime.utcnow() timestamp = datetime.datetime.utcnow()
try: try:
output = agent.run(task) output = agent.run(task)
status = "Success" status = "Success"
logger.info( logger.info(f"Agent {agent.agent_name} completed task successfully.")
f"Agent {agent.agent_name} completed task successfully."
)
except Exception as e: except Exception as e:
output = str(e) output = str(e)
status = "Failed" status = "Failed"
logger.error( logger.error(f"Agent {agent.agent_name} failed to complete task: {e}")
f"Agent {agent.agent_name} failed to complete task: {e}"
)
metadata = AgentOutputMetadata( metadata = AgentOutputMetadata(
agent_name=agent.agent_name, agent_name=agent.agent_name, task=task, timestamp=timestamp, status=status
task=task,
timestamp=timestamp,
status=status,
) )
data = AgentOutputData(output=output) data = AgentOutputData(output=output)
agent_output = AgentOutputSchema(metadata=metadata, data=data) agent_output = AgentOutputSchema(metadata=metadata, data=data)
# Publish result to Pulsar topic
try: try:
producer = self.producers.get(agent.agent_name) producer = self.producers.get(agent.agent_name)
if producer: if producer:
producer.send(agent_output.json().encode("utf-8")) producer.send(agent_output.model_dump_json().encode("utf-8")) # Send as JSON string
logger.debug( logger.debug(f"Published output for agent {agent.agent_name} to Pulsar topic.")
f"Published output for agent {agent.agent_name} to Pulsar topic."
)
else: else:
logger.warning( logger.warning(f"No producer found for agent {agent.agent_name}. Skipping publish step.")
f"No producer found for agent {agent.agent_name}. Skipping publish step."
)
except Exception as e: except Exception as e:
logger.error( logger.error(f"Failed to publish output for agent {agent.agent_name}: {e}")
f"Failed to publish output for agent {agent.agent_name}: {e}"
)
return agent_output return agent_output
def run(self, task: str) -> SwarmOutputSchema: def run(self, task: str) -> SwarmOutputSchema:
"""
Runs the swarm by executing the task across all agents sequentially and returns aggregated results.
:param task: The task string to be executed by the swarm.
:return: SwarmOutputSchema containing results from all agents.
"""
try: try:
self.connect_pulsar() self.connect_pulsar()
self.initialize_producers() self.initialize_producers()
for agent in self.agents: with concurrent.futures.ThreadPoolExecutor() as executor: # Parallel execution
result = self.run_task(agent, task) futures = [executor.submit(self.run_task, agent, task) for agent in self.agents]
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
self.swarm_results.results.append(result) self.swarm_results.results.append(result)
except Exception as e:
logger.error(f"A task encountered an error: {e}")
# Add a result with error information to the SwarmOutputSchema
failed_metadata = AgentOutputMetadata(
agent_name="Unknown", # Or some other identifier
task=task,
timestamp=datetime.datetime.utcnow(),
status="Failed"
)
failed_data = AgentOutputData(output=str(e))
failed_result = AgentOutputSchema(metadata=failed_metadata, data=failed_data)
self.swarm_results.results.append(failed_result)
logger.info("Swarm run completed successfully.")
logger.info("Swarm run completed.")
return self.swarm_results return self.swarm_results
except Exception as e: except Exception as e:
logger.error(f"Swarm run encountered an error: {e}") logger.error(f"Swarm run encountered an error: {e}")
raise raise
finally: finally:
if self.client: if self.client:
self.client.close() self.client.close()
logger.info("Pulsar client connection closed.") logger.info("Pulsar client connection closed.")
# Example usage # Example usage (similar to before)
if __name__ == "__main__": if __name__ == "__main__":
# Initialize OpenAIChat model # ... (agent and model initialization)
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
logger.error(
"OPENAI_API_KEY environment variable is not set."
)
sys.exit(1)
model = OpenAIChat(
api_key=api_key, model_name="gpt-4", temperature=0.1
)
# Define agents
agent1 = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
llm=model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="finance_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=2000,
return_step_meta=False,
)
agent2 = Agent(
agent_name="Market-Analysis-Agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
llm=model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="market_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=2000,
return_step_meta=False,
)
# Initialize and run swarm
swarm = SwarmManager(agents=[agent1, agent2]) swarm = SwarmManager(agents=[agent1, agent2])
task_description = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?" task_description = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?"
results = swarm.run(task_description) results = swarm.run(task_description)
print(results.model_dump_json(indent=4)) # Output JSON
# Output results
print(results.json(indent=4))

Loading…
Cancel
Save