From b53cd15c242083699eaa1c01ee92dfb853f1e0af Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Mon, 23 Dec 2024 11:39:56 -0800 Subject: [PATCH] [AUTO SWARM BUILDER LAZY LOADING] --- pyproject.toml | 2 +- swarm_arange_demo.py | 11 +- swarms/structs/pulsar_swarm.py | 276 -------------------------------- swarms/structs/swarm_builder.py | 15 +- 4 files changed, 18 insertions(+), 286 deletions(-) delete mode 100644 swarms/structs/pulsar_swarm.py diff --git a/pyproject.toml b/pyproject.toml index b4c7cf98..c8b174e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "6.7.7" +version = "6.7.8" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarm_arange_demo.py b/swarm_arange_demo.py index f900abcc..de0051f0 100644 --- a/swarm_arange_demo.py +++ b/swarm_arange_demo.py @@ -2,11 +2,9 @@ import os from swarm_models import OpenAIChat -from swarms import Agent, AgentRearrange -from swarms.structs.swarm_arange import SwarmRearrange +from swarms import Agent, AgentRearrange, SwarmRearrange -# model = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY")) -company = "TGSC" +company = "NVDA" # Get the OpenAI API key from the environment variable api_key = os.getenv("GROQ_API_KEY") @@ -203,16 +201,19 @@ blackstone_market_analysis = AgentRearrange( ) swarm_arrange = SwarmRearrange( + name = "Blackstone-Swarm", + description = "A swarm that processes tasks concurrently using multiple agents and rearranges the flow based on the task requirements.", swarms=[ blackstone_acquisition_analysis, blackstone_investment_strategy, blackstone_market_analysis, ], flow=f"{blackstone_acquisition_analysis.name} -> {blackstone_investment_strategy.name} -> {blackstone_market_analysis.name}", + max_loops=1, ) print( swarm_arrange.run( - "Analyze AI ETFs, focusing on their performance, market trends, and potential for growth" + "Analyze NVIDIA's performance, market trends, and potential for growth in the AI industry" ) ) diff --git a/swarms/structs/pulsar_swarm.py b/swarms/structs/pulsar_swarm.py deleted file mode 100644 index 2d8961f7..00000000 --- a/swarms/structs/pulsar_swarm.py +++ /dev/null @@ -1,276 +0,0 @@ -import asyncio -import pulsar - -from pulsar import ConsumerType -from loguru import logger -from swarms import Agent -from typing import List, Dict, Any -import json - - -class ScalableAsyncAgentSwarm: - """ - A scalable, asynchronous swarm of agents leveraging Apache Pulsar for inter-agent communication. - Provides load balancing, health monitoring, dead letter queues, and centralized logging. - """ - - def __init__( - self, - pulsar_url: str, - topic: str, - dlq_topic: str, - agents_config: List[Dict[str, Any]], - ): - """ - Initializes the async swarm with agents. - - Args: - pulsar_url (str): The URL of the Apache Pulsar broker. - topic (str): The main topic for task distribution. - dlq_topic (str): The Dead Letter Queue topic for failed messages. - agents_config (List[Dict[str, Any]]): List of agent configurations with `name`, `description`, and `model_name`. - """ - self.pulsar_url = pulsar_url - self.topic = topic - self.dlq_topic = dlq_topic - self.agents_config = agents_config - self.client = pulsar.Client(pulsar_url) - self.consumer = self.client.subscribe( - topic, - subscription_name="swarm-task-sub", - consumer_type=ConsumerType.Shared, - ) - self.dlq_producer = self.client.create_producer(dlq_topic) - self.response_logger = [] - self.agents = [ - self.create_agent(config) for config in agents_config - ] - self.agent_index = 0 - - logger.info( - "Swarm initialized with agents: {}", - [agent["name"] for agent in agents_config], - ) - - def create_agent( - self, agent_config: Dict[str, Any] - ) -> Dict[str, Any]: - """ - Creates a new agent configuration with asynchronous capabilities. - - Args: - agent_config (Dict[str, Any]): Configuration dictionary with agent details. - - Returns: - Dict[str, Any]: A dictionary containing agent metadata and functionality. - """ - agent_name = agent_config["name"] - description = agent_config["description"] - model_name = agent_config.get("model_name", "gpt-4o-mini") - - class AsyncAgent: - """ - An asynchronous agent that processes tasks and communicates via Apache Pulsar. - """ - - def __init__( - self, name: str, description: str, model_name: str - ): - self.name = name - self.description = description - self.agent = Agent( - agent_name=name, - model_name=model_name, - max_loops="auto", - interactive=True, - streaming_on=True, - ) - logger.info( - f"Initialized agent '{name}' - {description}" - ) - - async def process_task( - self, message: str - ) -> Dict[str, Any]: - """ - Processes a single task using the agent. - - Args: - message (str): The task message. - - Returns: - Dict[str, Any]: JSON-formatted response. - """ - try: - logger.info( - f"Agent {self.name} processing task: {message}" - ) - response = await asyncio.to_thread( - self.agent.run, message - ) - logger.info(f"Agent {self.name} completed task.") - return { - "agent_name": self.name, - "response": response, - } - except Exception as e: - logger.error( - f"Agent {self.name} encountered an error: {e}" - ) - return {"agent_name": self.name, "error": str(e)} - - return { - "name": agent_name, - "instance": AsyncAgent( - agent_name, description, model_name - ), - } - - async def distribute_task(self, message: str): - """ - Distributes a task to the next available agent using round-robin. - - Args: - message (str): The task message. - """ - agent = self.agents[self.agent_index] - self.agent_index = (self.agent_index + 1) % len(self.agents) - - try: - response = await agent["instance"].process_task(message) - self.log_response(response) - except Exception as e: - logger.error( - f"Error processing task by agent {agent['name']}: {e}" - ) - self.send_to_dlq(message) - - async def monitor_health(self): - """ - Periodically monitors the health of agents. - """ - while True: - logger.info("Performing health check for all agents.") - for agent in self.agents: - logger.info(f"Agent {agent['name']} is online.") - await asyncio.sleep(10) - - def send_to_dlq(self, message: str): - """ - Sends a failed message to the Dead Letter Queue (DLQ). - - Args: - message (str): The message to send to the DLQ. - """ - try: - self.dlq_producer.send(message.encode("utf-8")) - logger.info("Message sent to Dead Letter Queue.") - except Exception as e: - logger.error(f"Failed to send message to DLQ: {e}") - - def log_response(self, response: Dict[str, Any]): - """ - Logs the response to a centralized list for later analysis. - - Args: - response (Dict[str, Any]): The agent's response. - """ - self.response_logger.append(response) - logger.info(f"Response logged: {response}") - - async def listen_and_distribute(self): - """ - Listens to the main Pulsar topic and distributes tasks to agents. - """ - while True: - msg = self.consumer.receive() - try: - message = msg.data().decode("utf-8") - logger.info(f"Received task: {message}") - await self.distribute_task(message) - self.consumer.acknowledge(msg) - except Exception as e: - logger.error(f"Error processing message: {e}") - self.send_to_dlq(msg.data().decode("utf-8")) - self.consumer.negative_acknowledge(msg) - - async def run(self): - """ - Runs the swarm asynchronously with health monitoring and task distribution. - """ - logger.info("Starting the async swarm...") - task_listener = asyncio.create_task( - self.listen_and_distribute() - ) - health_monitor = asyncio.create_task(self.monitor_health()) - await asyncio.gather(task_listener, health_monitor) - - def shutdown(self): - """ - Safely shuts down the swarm and logs all responses. - """ - logger.info("Shutting down the swarm...") - self.client.close() - with open("responses.json", "w") as f: - json.dump(self.response_logger, f, indent=4) - logger.info("Responses saved to 'responses.json'.") - - -# from scalable_agent_swarm import ScalableAsyncAgentSwarm # Assuming your swarm class is saved here - -if __name__ == "__main__": - # Example Configuration - PULSAR_URL = "pulsar://localhost:6650" - TOPIC = "stock-analysis" - DLQ_TOPIC = "stock-analysis-dlq" - - # Agents configuration - AGENTS_CONFIG = [ - { - "name": "Stock-Analysis-Agent-1", - "description": "Analyzes stock trends.", - "model_name": "gpt-4o-mini", - }, - { - "name": "Stock-News-Agent", - "description": "Summarizes stock news.", - "model_name": "gpt-4o-mini", - }, - { - "name": "Tech-Trends-Agent", - "description": "Tracks tech sector trends.", - "model_name": "gpt-4o-mini", - }, - ] - - # Tasks to send - TASKS = [ - "Analyze the trend for tech stocks in Q4 2024", - "Summarize the latest news on the S&P 500", - "Identify the top-performing sectors in the stock market", - "Provide a forecast for AI-related stocks for 2025", - ] - - # Initialize and run the swarm - swarm = ScalableAsyncAgentSwarm( - PULSAR_URL, TOPIC, DLQ_TOPIC, AGENTS_CONFIG - ) - try: - # Run the swarm in the background - swarm_task = asyncio.create_task(swarm.run()) - - # Send tasks to the topic - client = pulsar.Client(PULSAR_URL) - producer = client.create_producer(TOPIC) - - for task in TASKS: - producer.send(task.encode("utf-8")) - print(f"Sent task: {task}") - - producer.close() - client.close() - - # Keep the swarm running - asyncio.run(swarm_task) - except KeyboardInterrupt: - swarm.shutdown() diff --git a/swarms/structs/swarm_builder.py b/swarms/structs/swarm_builder.py index 24ac34b0..2e963272 100644 --- a/swarms/structs/swarm_builder.py +++ b/swarms/structs/swarm_builder.py @@ -1,8 +1,7 @@ import os +import subprocess from typing import List, Optional -from dotenv import load_dotenv -from openai import OpenAI from pydantic import BaseModel, Field from pydantic.v1 import validator from swarm_models import OpenAIChat @@ -18,8 +17,6 @@ from loguru import logger logger.add("swarm_builder.log", rotation="10 MB", backtrace=True) -load_dotenv() - class OpenAIFunctionCaller: """ @@ -47,6 +44,16 @@ class OpenAIFunctionCaller: self.temperature = temperature self.base_model = base_model self.max_tokens = max_tokens + + + try: + from openai import OpenAI + except ImportError: + logger.error("OpenAI library not found. Please install the OpenAI library by running 'pip install openai'") + subprocess.run(["pip", "install", "openai"]) + from openai import OpenAI + + self.client = OpenAI(api_key=api_key) def run(self, task: str, *args, **kwargs) -> BaseModel: