From 414adf0af81a854aecf812eebe95f78e2a09b310 Mon Sep 17 00:00:00 2001 From: Kye Date: Thu, 16 May 2024 13:15:50 -0700 Subject: [PATCH] [FEAT][Human in the loop][AgentRearrange] --- agent_rearrange_human_in_loop.py | 64 ++++++++ docs/mkdocs.yml | 1 + docs/swarms_cloud/architecture.md | 138 ++++++++++++++++++ .../swarm_of_gpt4o_agents_for_octomogly.py | 6 +- scripts/log_cleanup.py | 10 +- swarms/structs/rearrange.py | 113 ++++++++++++-- 6 files changed, 308 insertions(+), 24 deletions(-) create mode 100644 agent_rearrange_human_in_loop.py create mode 100644 docs/swarms_cloud/architecture.md rename swarm_of_gpt4o_agents_for_octomogly.py => playground/demos/octomology_swarm/swarm_of_gpt4o_agents_for_octomogly.py (98%) diff --git a/agent_rearrange_human_in_loop.py b/agent_rearrange_human_in_loop.py new file mode 100644 index 00000000..126442cf --- /dev/null +++ b/agent_rearrange_human_in_loop.py @@ -0,0 +1,64 @@ +from swarms import Agent, AgentRearrange, Anthropic + + +# Initialize the director agent + +director = Agent( + agent_name="Director", + system_prompt="Directs the tasks for the workers", + llm=Anthropic(), + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="director.json", +) + + +# Initialize worker 1 + +worker1 = Agent( + agent_name="Worker1", + system_prompt="Generates a transcript for a youtube video on what swarms are", + llm=Anthropic(), + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="worker1.json", +) + + +# Initialize worker 2 +worker2 = Agent( + agent_name="Worker2", + system_prompt="Summarizes the transcript generated by Worker1", + llm=Anthropic(), + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="worker2.json", +) + + +# Create a list of agents +agents = [director, worker1, worker2] + +# Define the flow pattern +flow = "Director -> H -> Worker1 -> Worker2" + +# Using AgentRearrange class +agent_system = AgentRearrange( + agents=agents, flow=flow, human_in_the_loop=True +) +output = agent_system.run( + "Create a format to express and communicate swarms of llms in a structured manner for youtube" +) +print(output) diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 85a442e0..9d76e0d1 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -113,6 +113,7 @@ nav: - Migrate from OpenAI to Swarms in 3 lines of code: "swarms_cloud/migrate_openai.md" - Getting Started with SOTA Vision Language Models VLM: "swarms_cloud/getting_started.md" - Enterprise Guide to High-Performance Multi-Agent LLM Deployments: "swarms_cloud/production_deployment.md" + - Under The Hood The Swarm Cloud Serving Infrastructure: "swarms_cloud/architecture.md" - Swarms Framework [PY]: - Overview: "swarms/index.md" - DIY Build Your Own Agent: "diy_your_own_agent.md" diff --git a/docs/swarms_cloud/architecture.md b/docs/swarms_cloud/architecture.md new file mode 100644 index 00000000..0a0e7db4 --- /dev/null +++ b/docs/swarms_cloud/architecture.md @@ -0,0 +1,138 @@ +# Under The Hood: The Swarm Cloud Serving Infrastructure +----------------------------------------------------------------- + +This blog post delves into the intricate workings of our serving model infrastructure, providing a comprehensive understanding for both users and infrastructure engineers. We'll embark on a journey that starts with an API request and culminates in a response generated by your chosen model, all orchestrated within a multi-cloud environment. + +### The Journey of an API Request + +1. **The Gateway:** Your API request first arrives at an EC2 instance running SkyPilot, a lightweight controller. + +2. **Intelligent Routing:** SkyPilot, wielding its decision-making prowess, analyzes the request and identifies the most suitable GPU in our multi-cloud setup. Factors like resource availability, latency, and cost might influence this choice. + +3. **Multi-Cloud Agility:** Based on the chosen cloud provider (AWS or Azure), SkyPilot seamlessly directs the request to the appropriate containerized model residing in a sky clusters cluster. Here's where the magic of cloud-agnostic deployments comes into play. + +### Unveiling the Architecture + +Let's dissect the technical architecture behind this process: + +- **SkyPilot (EC2 Instance):** This lightweight controller, deployed on an EC2 instance, acts as the central hub for orchestrating requests and routing them to suitable model instances. + +- **Swarm Cloud Repositories:** Each model resides within its own dedicated folder on the Swarms Cloud GitHub repository (). Here, you'll find a folder structure like this: + +``` +servers/ + / + sky-serve.yaml # Deployment configuration file + / + sky-serve.yaml + ... + +``` + +- **SkyServe Deployment Tool:** This is the workhorse responsible for deploying models within sky clusters clusters. Each model's folder contains a `sky-serve.yaml` file that dictates the deployment configuration. + +### Infrastructure Engineer's Toolkit: Commands for Model Deployment + +Here's a breakdown of the `sky serve` command and its subcommands: + +- `sky serve -h`: Displays the help message for the `sky serve` CLI tool. + +**Commands:** + +- `sky serve up yaml.yaml -n --cloud aws/azure`: This command deploys a SkyServe service based on the provided `yaml.yaml` configuration file. The `-n` flag indicates a new deployment, and the `--cloud` flag specifies the target cloud platform (AWS or Azure). + +**Additional Commands:** + +- `sky serve update`: Updates a running SkyServe service. + +- `sky serve status`: Shows the status of deployed SkyServe services. + +- `sky serve down`: Tears down (stops and removes) a SkyServe service. + +- `sky serve logs`: Tails the logs of a running SkyServe service, providing valuable insights into its operation. + +By leveraging these commands, infrastructure engineers can efficiently manage the deployment and lifecycle of models within the multi-cloud environment. + +**Building the Cluster and Accessing the Model:** + +When you deploy a model using `sky serve up`, SkyServe triggers the building of a sky clusters cluster, if one doesn't already exist. Once the deployment is complete, SkyServe provides you with an endpoint URL for interacting with the model. This URL allows you to send requests to the deployed model and receive its predictions. + +### Understanding the `sky-serve.yaml` Configuration + +The `sky-serve.yaml` file plays a crucial role in defining the deployment parameters for your model. This file typically includes properties such as: + +- **Image:** Specifies the Docker image containing your model code and dependencies. + +- **Replicas:** Defines the number of model replicas to be deployed in the Swarm cluster. This allows for load balancing and fault tolerance. + +- **Resources:** Sets memory and CPU resource constraints for the deployed model containers. + +- **Networking:** Configures network settings for communication within the sky clusters and with the outside world. + +**Benefits of Our Infrastructure:** + +- **Multi-Cloud Flexibility:** Deploy models seamlessly across AWS and Azure, taking advantage of whichever platform best suits your needs. + +- **Scalability:** Easily scale model deployments up or down based on traffic demands. + +- **Cost Optimization:** The intelligent routing by SkyPilot helps optimize costs by utilizing the most cost-effective cloud resources. + +- **Simplified Management:** Manage models across clouds with a single set of commands using `sky serve`. + +### Deep Dive: Technical Architecture + +**Cloud Considerations:** + +Our multi-cloud architecture offers several advantages, but it also introduces complexities that need to be addressed. Here's a closer look at some key considerations: + +- **Cloud Provider APIs and SDKs:** SkyPilot interacts with the APIs and SDKs of the chosen cloud provider (AWS or Azure) to manage resources like virtual machines, storage, and networking. Infrastructure engineers need to be familiar with the specific APIs and SDKs for each cloud platform to ensure smooth operation and troubleshooting. + +- **Security:** Maintaining consistent security across different cloud environments is crucial. This involves aspects like IAM (Identity and Access Management) configuration, network segmentation, and encryption of sensitive data at rest and in transit. Infrastructure engineers need to implement robust security measures tailored to each cloud provider's offerings. + +- **Network Connectivity:** Establishing secure and reliable network connectivity between SkyPilot (running on EC2), sky clusters clusters (deployed on cloud VMs), and your client applications is essential. This might involve setting up VPN tunnels or utilizing cloud-native networking solutions offered by each provider. + +- **Monitoring and Logging:** Monitoring the health and performance of SkyPilot, sky clusters clusters, and deployed models across clouds is critical for proactive issue identification and resolution. Infrastructure engineers can leverage cloud provider-specific monitoring tools alongside centralized logging solutions for comprehensive oversight. + +**sky clusters Clusters** + +sky clusters is a container orchestration platform that facilitates the deployment and management of containerized applications, including your machine learning models. When you deploy a model with `sky serve up`, SkyPilot launches an node with: + +- **Provision Resources:** SkyPilot requests resources from the chosen cloud provider (e.g., VMs with GPUs) to create a sky clusters cluster if one doesn't already exist. + +- **Deploy Containerized Models:** SkyPilot leverages the `sky-serve.yaml` configuration to build Docker images containing your model code and dependencies. These images are then pushed to a container registry (e.g., Docker Hub) and deployed as containers within the Swarm cluster. + +- **Load Balancing and Service Discovery:** sky clusters provides built-in load balancing capabilities to distribute incoming requests across multiple model replicas, ensuring high availability and performance. Additionally, service discovery mechanisms allow models to find each other and communicate within the cluster. + +**SkyPilot - The Orchestrator** + +SkyPilot, the lightweight controller running on an EC2 instance, plays a central role in this infrastructure. Here's a deeper look at its functionalities: + +- **API Gateway Integration:** SkyPilot can be integrated with your API gateway or service mesh to receive incoming requests for model predictions. + +- **Request Routing:** SkyPilot analyzes the incoming request, considering factors like model compatibility, resource availability, and latency. Based on this analysis, SkyPilot selects the most suitable model instance within the appropriate sky clusters cluster. + +- **Cloud Provider Interaction:** SkyPilot interacts with the chosen cloud provider's APIs to manage resources required for the sky clusters cluster and model deployment. + +- **Model Health Monitoring:** SkyPilot can be configured to monitor the health and performance of deployed models. This might involve collecting metrics like model response times, resource utilization, and error rates. + +- **Scalability Management:** Based on pre-defined policies or real-time traffic patterns, SkyPilot can trigger the scaling of model deployments (adding or removing replicas) within the sky clusters cluster. + +**Advanced Considerations** + +This blog post has provided a foundational understanding of our serving model infrastructure. For infrastructure engineers seeking a deeper dive, here are some additional considerations: + +- **Container Security:** Explore container image scanning for vulnerabilities, enforcing least privilege principles within container runtime environments, and utilizing secrets management solutions for secure access to sensitive data. + +- **Model Versioning and Rollbacks:** Implement a model versioning strategy to track changes and facilitate rollbacks to previous versions if necessary. + +- **A/B Testing:** Integrate A/B testing frameworks to evaluate the performance of different model versions and configurations before full-scale deployment. + +- **Auto-Scaling with Cloud Monitoring:** Utilize cloud provider-specific monitoring services like Amazon CloudWatch or Azure Monitor to trigger auto-scaling of sky clusters clusters based on predefined metrics. + +By understanding these technical aspects and considerations, infrastructure engineers can effectively manage and optimize our multi-cloud serving model infrastructure. + +### Conclusion + +This comprehensive exploration has shed light on the intricate workings of our serving model infrastructure. We've covered the journey of an API request, delved into the technical architecture with a focus on cloud considerations, sky clusters clusters, and SkyPilot's role as the orchestrator. We've also explored advanced considerations for infrastructure engineers seeking to further optimize and secure this multi-cloud environment. + +This understanding empowers both users and infrastructure engineers to leverage this technology effectively for deploying and managing your machine learning models at scale. diff --git a/swarm_of_gpt4o_agents_for_octomogly.py b/playground/demos/octomology_swarm/swarm_of_gpt4o_agents_for_octomogly.py similarity index 98% rename from swarm_of_gpt4o_agents_for_octomogly.py rename to playground/demos/octomology_swarm/swarm_of_gpt4o_agents_for_octomogly.py index 15970eb1..19aead70 100644 --- a/swarm_of_gpt4o_agents_for_octomogly.py +++ b/playground/demos/octomology_swarm/swarm_of_gpt4o_agents_for_octomogly.py @@ -135,11 +135,11 @@ rearranger = AgentRearrange( verbose=True, ) -# image = "ear.png" +image = "ear_4.jpg" # Run the rearranger out = rearranger( - "Diagnose the image and provide a treatment plan for the patient", - # image, + "Diagnose this medical image, it's an ear canal, be precise", + image, ) print(out) diff --git a/scripts/log_cleanup.py b/scripts/log_cleanup.py index 3276d41f..a92f32f8 100644 --- a/scripts/log_cleanup.py +++ b/scripts/log_cleanup.py @@ -2,8 +2,8 @@ import os import shutil # Create a new directory for the log files if it doesn't exist -if not os.path.exists("artifacts_three"): - os.makedirs("artifacts_three") +if not os.path.exists("artifacts_five"): + os.makedirs("artifacts_five") # Walk through the current directory for dirpath, dirnames, filenames in os.walk("."): @@ -12,10 +12,10 @@ for dirpath, dirnames, filenames in os.walk("."): if filename.endswith(".log"): # Construct the full file path file_path = os.path.join(dirpath, filename) - # Move the log file to the 'artifacts_three' directory - shutil.move(file_path, "artifacts_three") + # Move the log file to the 'artifacts_five' directory + shutil.move(file_path, "artifacts_five") print( - "Moved all log files into the 'artifacts_three' directory and" + "Moved all log files into the 'artifacts_five' directory and" " deleted their original location." ) diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index d5c24bce..e685d552 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -1,9 +1,10 @@ from typing import List - from swarms.memory.base_vectordb import BaseVectorDatabase -from swarms.structs.agent import Agent from swarms.structs.base_swarm import BaseSwarm from swarms.utils.loguru_logger import logger +from typing import Optional, Callable, Dict +from swarms.structs.omni_agent_types import Agent +from swarms.structs.agent import Agent class AgentRearrange(BaseSwarm): @@ -30,6 +31,8 @@ class AgentRearrange(BaseSwarm): max_loops: int = 1, verbose: bool = True, memory_system: BaseVectorDatabase = None, + human_in_the_loop: bool = False, + custom_human_in_the_loop: Optional[Callable[[str], str]] = None, *args, **kwargs, ): @@ -41,9 +44,13 @@ class AgentRearrange(BaseSwarm): flow (str, optional): The flow pattern of the tasks. Defaults to None. """ self.agents = {agent.name: agent for agent in agents} - self.flow = flow + self.flow = flow if flow is not None else "" self.verbose = verbose - self.max_loops = max_loops + self.max_loops = max_loops if max_loops > 0 else 1 + self.memory_system = memory_system + self.human_in_the_loop = human_in_the_loop + self.custom_human_in_the_loop = custom_human_in_the_loop + self.swarm_history = {agent.agent_name: [] for agent in agents} # Verbose is True if verbose is True: @@ -54,6 +61,12 @@ class AgentRearrange(BaseSwarm): for agent in self.agents.values(): agent.long_term_memory = memory_system + logger.info( + "AgentRearrange initialized with agents: {}".format( + list(self.agents.keys()) + ) + ) + def add_agent(self, agent: Agent): """ Adds an agent to the swarm. @@ -64,6 +77,13 @@ class AgentRearrange(BaseSwarm): logger.info(f"Adding agent {agent.name} to the swarm.") self.agents[agent.name] = agent + def track_history( + self, + agent_name: str, + result: str, + ): + self.swarm_history[agent_name].append(result) + def remove_agent(self, agent_name: str): """ Removes an agent from the swarm. @@ -99,16 +119,23 @@ class AgentRearrange(BaseSwarm): ) agents_in_flow = [] + + # Arrow tasks = self.flow.split("->") + + # For the task in tasks for task in tasks: agent_names = [name.strip() for name in task.split(",")] + + # Loop over the agent names for agent_name in agent_names: - if agent_name not in self.agents: + if agent_name not in self.agents and agent_name != "H": raise ValueError( f"Agent '{agent_name}' is not registered." ) agents_in_flow.append(agent_name) + # If the length of the agents does not equal the length of the agents in flow if len(set(agents_in_flow)) != len(agents_in_flow): raise ValueError( "Duplicate agent names in the flow are not allowed." @@ -117,7 +144,14 @@ class AgentRearrange(BaseSwarm): print("Flow is valid.") return True - def run(self, task: str = None, img: str = None, *args, **kwargs): + def run( + self, + task: str = None, + img: str = None, + custom_tasks: Dict[str, str] = None, + *args, + **kwargs, + ): """ Runs the swarm to rearrange the tasks. @@ -134,6 +168,21 @@ class AgentRearrange(BaseSwarm): tasks = self.flow.split("->") current_task = task + # If custom_tasks have the agents name and tasks then combine them + if custom_tasks is not None: + c_agent_name, c_task = next(iter(custom_tasks.items())) + + # Find the position of the custom agent in the tasks list + position = tasks.index(c_agent_name) + + # If there is a prebois agent merge its task with the custom tasks + if position > 0: + tasks[position - 1] += "->" + c_task + else: + # If there is no prevous agent just insert the custom tasks + tasks.insert(position, c_task) + + # Set the loop counter loop_count = 0 while loop_count < self.max_loops: for task in tasks: @@ -147,11 +196,27 @@ class AgentRearrange(BaseSwarm): ) results = [] for agent_name in agent_names: - agent = self.agents[agent_name] - result = agent.run( - current_task, img, *args, **kwargs - ) - results.append(result) + if agent_name == "H": + # Human in the loop intervention + if ( + self.human_in_the_loop + and self.custom_human_in_the_loop + ): + current_task = ( + self.custom_human_in_the_loop( + current_task + ) + ) + else: + current_task = input( + "Enter your response:" + ) + else: + agent = self.agents[agent_name] + result = agent.run( + current_task, img, *args, **kwargs + ) + results.append(result) current_task = "; ".join(results) else: @@ -159,11 +224,27 @@ class AgentRearrange(BaseSwarm): logger.info( f"Running agents sequentially: {agent_names}" ) - agent = self.agents[agent_names[0]] - current_task = agent.run( - current_task, img, *args, **kwargs - ) - + agent_name = agent_names[0] + if agent_name == "H": + # Human-in-the-loop intervention + if ( + self.human_in_the_loop + and self.custom_human_in_the_loop + ): + current_task = ( + self.custom_human_in_the_loop( + current_task + ) + ) + else: + current_task = input( + "Enter the next task: " + ) + else: + agent = self.agents[agent_name] + current_task = agent.run( + current_task, img, *args, **kwargs + ) loop_count += 1 return current_task