[FEAT][Human in the loop][AgentRearrange]

pull/474/head
Kye 8 months ago
parent 61790fc40d
commit 414adf0af8

@ -0,0 +1,64 @@
from swarms import Agent, AgentRearrange, Anthropic
# Initialize the director agent
director = Agent(
agent_name="Director",
system_prompt="Directs the tasks for the workers",
llm=Anthropic(),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="director.json",
)
# Initialize worker 1
worker1 = Agent(
agent_name="Worker1",
system_prompt="Generates a transcript for a youtube video on what swarms are",
llm=Anthropic(),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker1.json",
)
# Initialize worker 2
worker2 = Agent(
agent_name="Worker2",
system_prompt="Summarizes the transcript generated by Worker1",
llm=Anthropic(),
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker2.json",
)
# Create a list of agents
agents = [director, worker1, worker2]
# Define the flow pattern
flow = "Director -> H -> Worker1 -> Worker2"
# Using AgentRearrange class
agent_system = AgentRearrange(
agents=agents, flow=flow, human_in_the_loop=True
)
output = agent_system.run(
"Create a format to express and communicate swarms of llms in a structured manner for youtube"
)
print(output)

@ -113,6 +113,7 @@ nav:
- Migrate from OpenAI to Swarms in 3 lines of code: "swarms_cloud/migrate_openai.md"
- Getting Started with SOTA Vision Language Models VLM: "swarms_cloud/getting_started.md"
- Enterprise Guide to High-Performance Multi-Agent LLM Deployments: "swarms_cloud/production_deployment.md"
- Under The Hood The Swarm Cloud Serving Infrastructure: "swarms_cloud/architecture.md"
- Swarms Framework [PY]:
- Overview: "swarms/index.md"
- DIY Build Your Own Agent: "diy_your_own_agent.md"

@ -0,0 +1,138 @@
# Under The Hood: The Swarm Cloud Serving Infrastructure
-----------------------------------------------------------------
This blog post delves into the intricate workings of our serving model infrastructure, providing a comprehensive understanding for both users and infrastructure engineers. We'll embark on a journey that starts with an API request and culminates in a response generated by your chosen model, all orchestrated within a multi-cloud environment.
### The Journey of an API Request
1. **The Gateway:** Your API request first arrives at an EC2 instance running SkyPilot, a lightweight controller.
2. **Intelligent Routing:** SkyPilot, wielding its decision-making prowess, analyzes the request and identifies the most suitable GPU in our multi-cloud setup. Factors like resource availability, latency, and cost might influence this choice.
3. **Multi-Cloud Agility:** Based on the chosen cloud provider (AWS or Azure), SkyPilot seamlessly directs the request to the appropriate containerized model residing in a sky clusters cluster. Here's where the magic of cloud-agnostic deployments comes into play.
### Unveiling the Architecture
Let's dissect the technical architecture behind this process:
- **SkyPilot (EC2 Instance):** This lightweight controller, deployed on an EC2 instance, acts as the central hub for orchestrating requests and routing them to suitable model instances.
- **Swarm Cloud Repositories:** Each model resides within its own dedicated folder on the Swarms Cloud GitHub repository (<https://github.com/kyegomez/swarms-cloud>). Here, you'll find a folder structure like this:
```
servers/
<model_name_1>/
sky-serve.yaml # Deployment configuration file
<model_name_2>/
sky-serve.yaml
...
```
- **SkyServe Deployment Tool:** This is the workhorse responsible for deploying models within sky clusters clusters. Each model's folder contains a `sky-serve.yaml` file that dictates the deployment configuration.
### Infrastructure Engineer's Toolkit: Commands for Model Deployment
Here's a breakdown of the `sky serve` command and its subcommands:
- `sky serve -h`: Displays the help message for the `sky serve` CLI tool.
**Commands:**
- `sky serve up yaml.yaml -n --cloud aws/azure`: This command deploys a SkyServe service based on the provided `yaml.yaml` configuration file. The `-n` flag indicates a new deployment, and the `--cloud` flag specifies the target cloud platform (AWS or Azure).
**Additional Commands:**
- `sky serve update`: Updates a running SkyServe service.
- `sky serve status`: Shows the status of deployed SkyServe services.
- `sky serve down`: Tears down (stops and removes) a SkyServe service.
- `sky serve logs`: Tails the logs of a running SkyServe service, providing valuable insights into its operation.
By leveraging these commands, infrastructure engineers can efficiently manage the deployment and lifecycle of models within the multi-cloud environment.
**Building the Cluster and Accessing the Model:**
When you deploy a model using `sky serve up`, SkyServe triggers the building of a sky clusters cluster, if one doesn't already exist. Once the deployment is complete, SkyServe provides you with an endpoint URL for interacting with the model. This URL allows you to send requests to the deployed model and receive its predictions.
### Understanding the `sky-serve.yaml` Configuration
The `sky-serve.yaml` file plays a crucial role in defining the deployment parameters for your model. This file typically includes properties such as:
- **Image:** Specifies the Docker image containing your model code and dependencies.
- **Replicas:** Defines the number of model replicas to be deployed in the Swarm cluster. This allows for load balancing and fault tolerance.
- **Resources:** Sets memory and CPU resource constraints for the deployed model containers.
- **Networking:** Configures network settings for communication within the sky clusters and with the outside world.
**Benefits of Our Infrastructure:**
- **Multi-Cloud Flexibility:** Deploy models seamlessly across AWS and Azure, taking advantage of whichever platform best suits your needs.
- **Scalability:** Easily scale model deployments up or down based on traffic demands.
- **Cost Optimization:** The intelligent routing by SkyPilot helps optimize costs by utilizing the most cost-effective cloud resources.
- **Simplified Management:** Manage models across clouds with a single set of commands using `sky serve`.
### Deep Dive: Technical Architecture
**Cloud Considerations:**
Our multi-cloud architecture offers several advantages, but it also introduces complexities that need to be addressed. Here's a closer look at some key considerations:
- **Cloud Provider APIs and SDKs:** SkyPilot interacts with the APIs and SDKs of the chosen cloud provider (AWS or Azure) to manage resources like virtual machines, storage, and networking. Infrastructure engineers need to be familiar with the specific APIs and SDKs for each cloud platform to ensure smooth operation and troubleshooting.
- **Security:** Maintaining consistent security across different cloud environments is crucial. This involves aspects like IAM (Identity and Access Management) configuration, network segmentation, and encryption of sensitive data at rest and in transit. Infrastructure engineers need to implement robust security measures tailored to each cloud provider's offerings.
- **Network Connectivity:** Establishing secure and reliable network connectivity between SkyPilot (running on EC2), sky clusters clusters (deployed on cloud VMs), and your client applications is essential. This might involve setting up VPN tunnels or utilizing cloud-native networking solutions offered by each provider.
- **Monitoring and Logging:** Monitoring the health and performance of SkyPilot, sky clusters clusters, and deployed models across clouds is critical for proactive issue identification and resolution. Infrastructure engineers can leverage cloud provider-specific monitoring tools alongside centralized logging solutions for comprehensive oversight.
**sky clusters Clusters**
sky clusters is a container orchestration platform that facilitates the deployment and management of containerized applications, including your machine learning models. When you deploy a model with `sky serve up`, SkyPilot launches an node with:
- **Provision Resources:** SkyPilot requests resources from the chosen cloud provider (e.g., VMs with GPUs) to create a sky clusters cluster if one doesn't already exist.
- **Deploy Containerized Models:** SkyPilot leverages the `sky-serve.yaml` configuration to build Docker images containing your model code and dependencies. These images are then pushed to a container registry (e.g., Docker Hub) and deployed as containers within the Swarm cluster.
- **Load Balancing and Service Discovery:** sky clusters provides built-in load balancing capabilities to distribute incoming requests across multiple model replicas, ensuring high availability and performance. Additionally, service discovery mechanisms allow models to find each other and communicate within the cluster.
**SkyPilot - The Orchestrator**
SkyPilot, the lightweight controller running on an EC2 instance, plays a central role in this infrastructure. Here's a deeper look at its functionalities:
- **API Gateway Integration:** SkyPilot can be integrated with your API gateway or service mesh to receive incoming requests for model predictions.
- **Request Routing:** SkyPilot analyzes the incoming request, considering factors like model compatibility, resource availability, and latency. Based on this analysis, SkyPilot selects the most suitable model instance within the appropriate sky clusters cluster.
- **Cloud Provider Interaction:** SkyPilot interacts with the chosen cloud provider's APIs to manage resources required for the sky clusters cluster and model deployment.
- **Model Health Monitoring:** SkyPilot can be configured to monitor the health and performance of deployed models. This might involve collecting metrics like model response times, resource utilization, and error rates.
- **Scalability Management:** Based on pre-defined policies or real-time traffic patterns, SkyPilot can trigger the scaling of model deployments (adding or removing replicas) within the sky clusters cluster.
**Advanced Considerations**
This blog post has provided a foundational understanding of our serving model infrastructure. For infrastructure engineers seeking a deeper dive, here are some additional considerations:
- **Container Security:** Explore container image scanning for vulnerabilities, enforcing least privilege principles within container runtime environments, and utilizing secrets management solutions for secure access to sensitive data.
- **Model Versioning and Rollbacks:** Implement a model versioning strategy to track changes and facilitate rollbacks to previous versions if necessary.
- **A/B Testing:** Integrate A/B testing frameworks to evaluate the performance of different model versions and configurations before full-scale deployment.
- **Auto-Scaling with Cloud Monitoring:** Utilize cloud provider-specific monitoring services like Amazon CloudWatch or Azure Monitor to trigger auto-scaling of sky clusters clusters based on predefined metrics.
By understanding these technical aspects and considerations, infrastructure engineers can effectively manage and optimize our multi-cloud serving model infrastructure.
### Conclusion
This comprehensive exploration has shed light on the intricate workings of our serving model infrastructure. We've covered the journey of an API request, delved into the technical architecture with a focus on cloud considerations, sky clusters clusters, and SkyPilot's role as the orchestrator. We've also explored advanced considerations for infrastructure engineers seeking to further optimize and secure this multi-cloud environment.
This understanding empowers both users and infrastructure engineers to leverage this technology effectively for deploying and managing your machine learning models at scale.

@ -135,11 +135,11 @@ rearranger = AgentRearrange(
verbose=True,
)
# image = "ear.png"
image = "ear_4.jpg"
# Run the rearranger
out = rearranger(
"Diagnose the image and provide a treatment plan for the patient",
# image,
"Diagnose this medical image, it's an ear canal, be precise",
image,
)
print(out)

@ -2,8 +2,8 @@ import os
import shutil
# Create a new directory for the log files if it doesn't exist
if not os.path.exists("artifacts_three"):
os.makedirs("artifacts_three")
if not os.path.exists("artifacts_five"):
os.makedirs("artifacts_five")
# Walk through the current directory
for dirpath, dirnames, filenames in os.walk("."):
@ -12,10 +12,10 @@ for dirpath, dirnames, filenames in os.walk("."):
if filename.endswith(".log"):
# Construct the full file path
file_path = os.path.join(dirpath, filename)
# Move the log file to the 'artifacts_three' directory
shutil.move(file_path, "artifacts_three")
# Move the log file to the 'artifacts_five' directory
shutil.move(file_path, "artifacts_five")
print(
"Moved all log files into the 'artifacts_three' directory and"
"Moved all log files into the 'artifacts_five' directory and"
" deleted their original location."
)

@ -1,9 +1,10 @@
from typing import List
from swarms.memory.base_vectordb import BaseVectorDatabase
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.loguru_logger import logger
from typing import Optional, Callable, Dict
from swarms.structs.omni_agent_types import Agent
from swarms.structs.agent import Agent
class AgentRearrange(BaseSwarm):
@ -30,6 +31,8 @@ class AgentRearrange(BaseSwarm):
max_loops: int = 1,
verbose: bool = True,
memory_system: BaseVectorDatabase = None,
human_in_the_loop: bool = False,
custom_human_in_the_loop: Optional[Callable[[str], str]] = None,
*args,
**kwargs,
):
@ -41,9 +44,13 @@ class AgentRearrange(BaseSwarm):
flow (str, optional): The flow pattern of the tasks. Defaults to None.
"""
self.agents = {agent.name: agent for agent in agents}
self.flow = flow
self.flow = flow if flow is not None else ""
self.verbose = verbose
self.max_loops = max_loops
self.max_loops = max_loops if max_loops > 0 else 1
self.memory_system = memory_system
self.human_in_the_loop = human_in_the_loop
self.custom_human_in_the_loop = custom_human_in_the_loop
self.swarm_history = {agent.agent_name: [] for agent in agents}
# Verbose is True
if verbose is True:
@ -54,6 +61,12 @@ class AgentRearrange(BaseSwarm):
for agent in self.agents.values():
agent.long_term_memory = memory_system
logger.info(
"AgentRearrange initialized with agents: {}".format(
list(self.agents.keys())
)
)
def add_agent(self, agent: Agent):
"""
Adds an agent to the swarm.
@ -64,6 +77,13 @@ class AgentRearrange(BaseSwarm):
logger.info(f"Adding agent {agent.name} to the swarm.")
self.agents[agent.name] = agent
def track_history(
self,
agent_name: str,
result: str,
):
self.swarm_history[agent_name].append(result)
def remove_agent(self, agent_name: str):
"""
Removes an agent from the swarm.
@ -99,16 +119,23 @@ class AgentRearrange(BaseSwarm):
)
agents_in_flow = []
# Arrow
tasks = self.flow.split("->")
# For the task in tasks
for task in tasks:
agent_names = [name.strip() for name in task.split(",")]
# Loop over the agent names
for agent_name in agent_names:
if agent_name not in self.agents:
if agent_name not in self.agents and agent_name != "H":
raise ValueError(
f"Agent '{agent_name}' is not registered."
)
agents_in_flow.append(agent_name)
# If the length of the agents does not equal the length of the agents in flow
if len(set(agents_in_flow)) != len(agents_in_flow):
raise ValueError(
"Duplicate agent names in the flow are not allowed."
@ -117,7 +144,14 @@ class AgentRearrange(BaseSwarm):
print("Flow is valid.")
return True
def run(self, task: str = None, img: str = None, *args, **kwargs):
def run(
self,
task: str = None,
img: str = None,
custom_tasks: Dict[str, str] = None,
*args,
**kwargs,
):
"""
Runs the swarm to rearrange the tasks.
@ -134,6 +168,21 @@ class AgentRearrange(BaseSwarm):
tasks = self.flow.split("->")
current_task = task
# If custom_tasks have the agents name and tasks then combine them
if custom_tasks is not None:
c_agent_name, c_task = next(iter(custom_tasks.items()))
# Find the position of the custom agent in the tasks list
position = tasks.index(c_agent_name)
# If there is a prebois agent merge its task with the custom tasks
if position > 0:
tasks[position - 1] += "->" + c_task
else:
# If there is no prevous agent just insert the custom tasks
tasks.insert(position, c_task)
# Set the loop counter
loop_count = 0
while loop_count < self.max_loops:
for task in tasks:
@ -147,11 +196,27 @@ class AgentRearrange(BaseSwarm):
)
results = []
for agent_name in agent_names:
agent = self.agents[agent_name]
result = agent.run(
current_task, img, *args, **kwargs
)
results.append(result)
if agent_name == "H":
# Human in the loop intervention
if (
self.human_in_the_loop
and self.custom_human_in_the_loop
):
current_task = (
self.custom_human_in_the_loop(
current_task
)
)
else:
current_task = input(
"Enter your response:"
)
else:
agent = self.agents[agent_name]
result = agent.run(
current_task, img, *args, **kwargs
)
results.append(result)
current_task = "; ".join(results)
else:
@ -159,11 +224,27 @@ class AgentRearrange(BaseSwarm):
logger.info(
f"Running agents sequentially: {agent_names}"
)
agent = self.agents[agent_names[0]]
current_task = agent.run(
current_task, img, *args, **kwargs
)
agent_name = agent_names[0]
if agent_name == "H":
# Human-in-the-loop intervention
if (
self.human_in_the_loop
and self.custom_human_in_the_loop
):
current_task = (
self.custom_human_in_the_loop(
current_task
)
)
else:
current_task = input(
"Enter the next task: "
)
else:
agent = self.agents[agent_name]
current_task = agent.run(
current_task, img, *args, **kwargs
)
loop_count += 1
return current_task

Loading…
Cancel
Save