[DOCS][Cleanup][Playground Example]

pull/484/head
Kye Gomez 7 months ago
parent 2ad7feb9ae
commit 54f8e0d062

@ -103,9 +103,14 @@ nav:
- Usage Examples:
- Build an Agent: "diy_your_own_agent.md"
- Build an Agent with tools: "examples/tools_agents.md"
# Add an examples with building agents
# Add multiple blogs on orchestrating agents with different types of frameworks
- Why does Swarms Exist?:
- Why Swarms? Orchestrating Agents for Enterprise Automation: "why.md"
- Limitations of Individual Agents: "limits_of_individual_agents.md"
- References:
- Agent Glossary: "swarms/glossary.md"
- List of The Best Multi-Agent Papers: "swarms/papers.md"
- Swarms Cloud API:
- Overview: "swarms_cloud/main.md"
- Available Models: "swarms_cloud/available_models.md"

@ -0,0 +1,48 @@
# Glossary of Terms
**Agent**:
An LLM (Large Language Model) equipped with tools and memory, operating with a specific objective in a loop. An agent can perform tasks, interact with other agents, and utilize external tools and memory systems to achieve its goals.
**Swarms**:
A group of more than two agents working together and communicating to accomplish a shared objective. Swarms enable complex, collaborative tasks that leverage the strengths of multiple agents.
**Tool**:
A Python function that is converted into a function call, allowing agents to perform specific actions or access external resources. Tools enhance the capabilities of agents by providing specialized functionalities.
**Memory System**:
A system for managing information retrieval and storage, often implemented as a Retrieval-Augmented Generation (RAG) system or a memory vector database. Memory systems enable agents to recall previous interactions, store new information, and improve decision-making based on historical data.
**LLM (Large Language Model)**:
A type of AI model designed to understand and generate human-like text. LLMs, such as GPT-3 or GPT-4, are used as the core computational engine for agents.
**System Prompt**:
A predefined prompt that sets the context and instructions for an agent's task. The system prompt guides the agent's behavior and response generation.
**Max Loops**:
The maximum number of iterations an agent will perform to complete its task. This parameter helps control the extent of an agent's processing and ensures tasks are completed efficiently.
**Dashboard**:
A user interface that provides real-time monitoring and control over the agents and their activities. Dashboards can display agent status, logs, and performance metrics.
**Streaming On**:
A setting that enables agents to stream their output incrementally, providing real-time feedback as they process tasks. This feature is useful for monitoring progress and making adjustments on the fly.
**Verbose**:
A setting that controls the level of detail in an agent's output and logging. When verbose mode is enabled, the agent provides more detailed information about its operations and decisions.
**Multi-modal**:
The capability of an agent to process and integrate multiple types of data, such as text, images, and audio. Multi-modal agents can handle more complex tasks that require diverse inputs.
**Autosave**:
A feature that automatically saves the agent's state and progress at regular intervals. Autosave helps prevent data loss and allows for recovery in case of interruptions.
**Flow**:
The predefined sequence in which agents in a swarm interact and process tasks. The flow ensures that each agent's output is appropriately passed to the next agent, facilitating coordinated efforts.
**Long Term Memory**:
A component of the memory system that retains information over extended periods, enabling agents to recall and utilize past interactions and experiences.
**Output Schema**:
A structured format for the output generated by agents, often defined using data models like Pydantic's BaseModel. Output schemas ensure consistency and clarity in the information produced by agents.
By understanding these terms, you can effectively build and orchestrate agents and swarms, leveraging their capabilities to perform complex, collaborative tasks.

@ -0,0 +1,3 @@
# awesome-multi-agent-papers
An awesome list of multi-agent papers that show you various swarm architectures and much more. [Get started](https://github.com/kyegomez/awesome-multi-agent-papers)

@ -1,16 +1,34 @@
import os
from dotenv import load_dotenv
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from swarms import Agent
from swarms.models import Anthropic
from swarms.models import OpenAIChat
from swarms.models.gpt4_vision_api import GPT4VisionAPI
from swarms.structs.rearrange import AgentRearrange
from fastapi import FastAPI
from typing import Optional, List, Dict, Any
# Load the environment variables
load_dotenv()
# LLM
llm = GPT4VisionAPI(
model_name="gpt-4-1106-vision-preview",
max_tokens=3000,
)
openai = OpenAIChat(
openai_api_key=os.getenv("OPENAI_API_KEY"),
max_tokens=3000,
)
# Setup the FastAPI app
app = FastAPI()
def DIAGNOSIS_SYSTEM_PROMPT() -> str:
return """
@ -60,86 +78,103 @@ def TREATMENT_PLAN_SYSTEM_PROMPT() -> str:
"""
class DiagnosisSchema(BaseModel):
image_name: str = Field(
...,
title="Image Name",
description="The name of the image to be diagnosed",
)
task: str = Field(
...,
title="Task",
description="The task to be performed on the image",
)
diagnosis: str = Field(
..., title="Diagnosis", description="The diagnosis of the image"
class LLMConfig(BaseModel):
model_name: str
max_tokens: int
class AgentConfig(BaseModel):
agent_name: str
system_prompt: str
llm: LLMConfig
max_loops: int
autosave: bool
dashboard: bool
class AgentRearrangeConfig(BaseModel):
agents: List[AgentConfig]
flow: str
max_loops: int
verbose: bool
class AgentRunResult(BaseModel):
agent_name: str
output: Dict[str, Any]
tokens_generated: int
class RunAgentsResponse(BaseModel):
results: List[AgentRunResult]
total_tokens_generated: int
class AgentRearrangeResponse(BaseModel):
results: List[AgentRunResult]
total_tokens_generated: int
class RunConfig(BaseModel):
task: str = Field(..., title="The task to run")
flow: str = "D -> T"
image: Optional[str] = None # Optional image path as a string
max_loops: Optional[int] = 1
@app.get("/v1/health")
async def health_check():
return JSONResponse(content={"status": "healthy"})
@app.get("/v1/models_available")
async def models_available():
available_models = {
"models": [
{"name": "gpt-4-1106-vision-preview", "type": "vision"},
{"name": "openai-chat", "type": "text"}
]
}
return JSONResponse(content=available_models)
@app.get("/v1/swarm/completions")
async def run_agents(run_config: RunConfig):
# Diagnoser agent
diagnoser = Agent(
# agent_name="Medical Image Diagnostic Agent",
agent_name="D",
system_prompt=DIAGNOSIS_SYSTEM_PROMPT(),
llm=llm,
max_loops=1,
autosave=True,
dashboard=True,
)
class TreatMentSchema(BaseModel):
image_name: str = Field(
...,
title="Image Name",
description="The name of the image to be treated",
)
task: str = Field(
...,
title="Task",
description="The task to be performed on the image",
# Agent 2 the treatment plan provider
treatment_plan_provider = Agent(
# agent_name="Medical Treatment Recommendation Agent",
agent_name="T",
system_prompt=TREATMENT_PLAN_SYSTEM_PROMPT(),
llm=openai,
max_loops=1,
autosave=True,
dashboard=True,
)
treatment: str = Field(
..., title="Treatment", description="The treatment of the image"
)
# LLM
llm = GPT4VisionAPI(
openai_api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-4o",
max_tokens=1000,
)
# Anthropic
anthropic = Anthropic(
anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"),
)
# Diagnoser agent
diagnoser = Agent(
# agent_name="Medical Image Diagnostic Agent",
agent_name="D",
system_prompt=DIAGNOSIS_SYSTEM_PROMPT(),
llm=llm,
max_loops=1,
autosave=True,
dashboard=True,
)
# Agent 2 the treatment plan provider
treatment_plan_provider = Agent(
# agent_name="Medical Treatment Recommendation Agent",
agent_name="T",
system_prompt=TREATMENT_PLAN_SYSTEM_PROMPT(),
llm=anthropic,
max_loops=1,
autosave=True,
dashboard=True,
)
# Agent 3 the re-arranger
rearranger = AgentRearrange(
agents=[diagnoser, treatment_plan_provider],
flow=run_config.flow,
max_loops=run_config.max_loops,
verbose=True,
)
# Agent 3 the re-arranger
rearranger = AgentRearrange(
agents=[diagnoser, treatment_plan_provider],
flow="D -> T",
max_loops=1,
verbose=True,
)
# Run the rearranger
out = rearranger(
run_config.task,
image=run_config.image,
)
return JSONResponse(content=out)
image = "ear_4.jpg"
# Run the rearranger
out = rearranger(
"Diagnose this medical image, it's an ear canal, be precise",
image,
)
print(out)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Binary file not shown.

After

Width:  |  Height:  |  Size: 342 KiB

@ -17,7 +17,7 @@ from playground.demos.plant_biologist_swarm.prompts import (
treatment_recommender_agent,
)
from swarms import Agent, GPT4VisionAPI
from swarms import Agent, Fuyu
# Load the OpenAI API key from the .env file
@ -28,9 +28,9 @@ api_key = os.environ.get("OPENAI_API_KEY")
# llm = llm,
llm = GPT4VisionAPI(
llm = Fuyu(
max_tokens=4000,
model_name="gpt-4o",
openai_api_key=api_key,
)
# Initialize Diagnoser Agent

@ -1,25 +0,0 @@
hey guys, we out here testing out swarms which is a multi-modal agent
framework which potentially makes all the agents work in a single pot
for instance take an empty pot and place all the known agents in that
pot and output a well structured answer out of it
that's basically it, we belive that a multi-agent framework beats a single
agent framework which is not really rocket science
ight first we gotta make sure out evn clean, install python3-pip,
this runs on python3.10
our current version of swarms==4.1.0
make sure you in a virtual env or conda
just do
$ python3 -m venv ~/.venv
$ source ~/.venv/bin/active
then boom we in a virtual env LFG
now for the best we install swarms
$ pip3 instll --upgrade swamrs==4.1.0

@ -32,7 +32,7 @@ agent = Agent(
max_loops=4,
autosave=True,
dashboard=True,
long_term_memory=ChromaDB(),
long_term_memory=chromadb,
)
# Run the workflow on a task

@ -1,22 +0,0 @@
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
# Retrieve your API key from the environment or replace with your actual key
api_key = "sksdsds"
# Initialize HierarchicalSwarm with your API key
swarm = HierarchicalSwarm(openai_api_key=api_key)
# Define an objective
objective = """
Please make a web GUI for using HTTP API server.
The name of it is HierarchicalSwarm.
You can check the server code at ./main.py.
The server is served on localhost:8000.
Users should be able to write text input as 'query' and url array as 'files', and check the response.
Users input form should be delivered in JSON format.
I want it to have neumorphism-style. Serve it on port 4500.
"""
# Run HierarchicalSwarm
swarm.run(objective)

@ -1,11 +0,0 @@
from swarms import DialogueSimulator, Worker
def select_next_speaker(step: int, agents) -> int:
idx = (step) % len(agents)
return idx
debate = DialogueSimulator(Worker, select_next_speaker)
debate.run()

@ -1,4 +1,4 @@
from swarms import Agent, AgentRearrange, rearrange, Anthropic
from swarms import Agent, AgentRearrange, OpenAIChat
# Initialize the director agent
@ -6,7 +6,7 @@ from swarms import Agent, AgentRearrange, rearrange, Anthropic
director = Agent(
agent_name="Director",
system_prompt="Directs the tasks for the workers",
llm=Anthropic(),
llm=OpenAIChat(),
max_loops=1,
dashboard=False,
streaming_on=True,
@ -22,7 +22,7 @@ director = Agent(
worker1 = Agent(
agent_name="Worker1",
system_prompt="Generates a transcript for a youtube video on what swarms are",
llm=Anthropic(),
llm=OpenAIChat(),
max_loops=1,
dashboard=False,
streaming_on=True,
@ -37,7 +37,7 @@ worker1 = Agent(
worker2 = Agent(
agent_name="Worker2",
system_prompt="Summarizes the transcript generated by Worker1",
llm=Anthropic(),
llm=OpenAIChat(),
max_loops=1,
dashboard=False,
streaming_on=True,
@ -62,11 +62,11 @@ output = agent_system.run(
print(output)
# Using rearrange function
output = rearrange(
agents,
flow,
"Create a format to express and communicate swarms of llms in a structured manner for youtube",
)
# # Using rearrange function
# output = rearrange(
# agents,
# flow,
# "Create a format to express and communicate swarms of llms in a structured manner for youtube",
# )
print(output)
# print(output)

@ -2,16 +2,30 @@ from swarms import OpenAIChat
from swarms.structs.agent import Agent
from swarms.structs.message_pool import MessagePool
# Create agents
agent1 = Agent(llm=OpenAIChat(), agent_name="agent1")
agent2 = Agent(llm=OpenAIChat(), agent_name="agent2")
agent3 = Agent(llm=OpenAIChat(), agent_name="agent3")
# Create moderator agent
moderator = Agent(agent_name="moderator")
# Create a list of agents
agents = [agent1, agent2, agent3]
# Create a message pool with 5 turns
message_pool = MessagePool(agents=agents, moderator=moderator, turns=5)
# Add messages to the message pool
message_pool.add(agent=agent1, content="Hello, agent2!", turn=1)
message_pool.add(agent=agent2, content="Hello, agent1!", turn=1)
message_pool.add(agent=agent3, content="Hello, agent1!", turn=1)
# Get all messages in the message pool
message_pool.get_all_messages()
# Get visible messages for agent1 in turn 1
message_pool.get_visible_messages(agent=agent1, turn=1)
# Get visible messages for agent2 in turn 1
message_pool.get_visible_messages(agent=agent2, turn=1)

@ -1,48 +0,0 @@
from swarms.models import OpenAIChat
from swarms.structs.multi_agent_debate import (
MultiAgentDebate,
select_speaker,
)
from swarms.workers.worker import Worker
llm = OpenAIChat()
worker1 = Worker(
llm=llm,
ai_name="Bumble Bee",
ai_role="Worker in a swarm",
human_in_the_loop=False,
temperature=0.5,
)
worker2 = Worker(
llm=llm,
ai_name="Optimus Prime",
ai_role="Worker in a swarm",
human_in_the_loop=False,
temperature=0.5,
)
worker3 = Worker(
llm=llm,
ai_name="Megatron",
ai_role="Worker in a swarm",
human_in_the_loop=False,
temperature=0.5,
)
# init agents
agents = [worker1, worker2, worker3]
# Initialize multi-agent debate with the selection function
debate = MultiAgentDebate(agents, select_speaker)
# Run task
task = (
"What were the winning boston marathon times for the past 5 years"
" (ending in 2022)? Generate a table of the year, name, country"
" of origin, and times."
)
results = debate.run(task, max_iters=4)
# Print results
for result in results:
print(f"Agent {result['agent']} responded: {result['response']}")

@ -1,7 +1,5 @@
import os
from dotenv import load_dotenv
from swarms.structs import Agent, OpenAIChat, Task
# Load the environment variables

@ -48,7 +48,6 @@ Pillow = "10.3.0"
psutil = "*"
sentry-sdk = "*"
python-dotenv = "*"
opencv-python-headless = "*"
PyYAML = "*"
docstring_parser = "0.16"

@ -0,0 +1,25 @@
from swarms import Agent, OpenAIChat, SwarmNetwork
# Create an instance of Agent
agent1 = Agent(
agent_name="Covid-19-Chat", # Name of the agent
agent_description="This agent provides information about COVID-19 symptoms.", # Description of the agent
llm=OpenAIChat(), # Language model used by the agent
max_loops="auto", # Maximum number of loops the agent can run
autosave=True, # Whether to automatically save the agent's state
verbose=True, # Whether to print verbose output
stopping_condition="finish", # Condition for stopping the agent
)
agents = [agent1] # List of agents (add more agents as needed)
swarm_name = "HealthSwarm" # Name of the swarm
swarm_description = "A swarm of agents providing health-related information." # Description of the swarm
# Create an instance of SwarmNetwork with API enabled
agent_api = SwarmNetwork(
swarm_name, swarm_description, agents, api_on=True
)
# Run the agent API
agent_api.run()

@ -11,14 +11,6 @@ import requests
from PIL import Image
from termcolor import colored
try:
import cv2
except ImportError:
print(
"Error importing cv2 try installing it with `pip install"
" opencv-python`"
)
class BaseMultiModalModel:
"""
@ -148,42 +140,6 @@ class BaseMultiModalModel:
image_pil = Image.open(img)
return image_pil
def process_video(
self,
video_path: str = None,
type_img: str = ".jpg",
*args,
**kwargs,
):
"""Process a video
Args:
video_path (str, optional): _description_. Defaults to None.
type_img (str, optional): _description_. Defaults to ".jpg".
*args: _description_.
**kwargs: _description_.
"""
try:
video = cv2.VideoCapture(video_path)
base64Frames = []
while video.isOpened():
success, frame = video.read()
if not success:
break
_, buffer = cv2.imencode(type_img, frame)
base64Frames.append(
base64.b64encode(buffer).decode("utf-8")
)
video.release()
print(len(base64Frames), "frames read")
return base64Frames
except Exception as error:
print(f"Error processing video {error} try again")
raise error
def clear_chat_history(self):
"""Clear the chat history"""
self.chat_history = []

@ -11,14 +11,6 @@ from termcolor import colored
from swarms.utils.loguru_logger import logger
from swarms.models.base_multimodal_model import BaseMultiModalModel
try:
import cv2
except ImportError:
print(
"OpenCV not installed. Please install OpenCV to use this" " model."
)
raise ImportError
# Load environment variables
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
@ -222,92 +214,6 @@ class GPT4VisionAPI(BaseMultiModalModel):
for chunk in content:
print(chunk)
def process_video(self, video: str = None):
"""
Process a video into a list of base64 frames
Parameters
----------
video : str
The path to the video file
Returns
-------
base64_frames : list
A list of base64 frames
Examples
--------
>>> from swarms.models import GPT4VisionAPI
>>> llm = GPT4VisionAPI()
>>> video = "video.mp4"
>>> base64_frames = llm.process_video(video)
"""
video = cv2.VideoCapture(video)
base64_frames = []
while video.isOpened():
success, frame = video.read()
if not success:
break
_, buffer = cv2.imencode(".jpg", frame)
base64_frames.append(base64.b64encode(buffer).decode("utf-8"))
video.release()
print(len(base64_frames), "frames read.")
return base64_frames
def run_with_video(
self,
task: str = None,
video: str = None,
*args,
**kwargs,
):
prompt = self.video_prompt(self.process_video(video))
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {openai_api_key}",
}
payload = {
"model": self.model_name,
"messages": [
{
"role": "system",
"content": [self.system_prompt],
},
{
"role": "user",
"content": [
(task,), # task
*map(
lambda x: {"image": x, "resize": 768},
prompt[0::50],
),
],
},
],
"max_tokens": self.max_tokens,
}
response = requests.post(
self.openai_proxy,
headers=headers,
json=payload,
)
out = response.json()
content = out["choices"][0]["message"]["content"]
if self.streaming_enabled:
content = self.stream_response(content)
if self.beautify:
content = colored(content, "cyan")
print(content)
else:
print(content)
def __call__(
self,
task: Optional[str] = None,

@ -1,9 +1,4 @@
import os
import cv2
import numpy as np
import requests
import torch
from PIL import Image
from transformers import AutoModelForVision2Seq, AutoProcessor
@ -18,25 +13,12 @@ def is_overlapping(rect1, rect2):
class Kosmos(BaseMultiModalModel):
"""
Kosmos model by Yen-Chun Shieh
Parameters
----------
model_name : str
Path to the pretrained model
Examples
--------
>>> kosmos = Kosmos()
>>> kosmos("Hello, my name is", "path/to/image.png")
"""
def __init__(
self,
model_name="ydshieh/kosmos-2-patch14-224",
max_new_tokens: int = 64,
verbose: bool = False,
*args,
**kwargs,
):
@ -121,201 +103,6 @@ class Kosmos(BaseMultiModalModel):
task = "<grounding> Describe this image in detail"
self.run(task, image_url)
def draw_entity_boxes_on_image(
image, entities, show=False, save_path=None
):
"""_summary_
Args:
image (_type_): image or image path
collect_entity_location (_type_): _description_
"""
if isinstance(image, Image.Image):
image_h = image.height
image_w = image.width
image = np.array(image)[:, :, [2, 1, 0]]
elif isinstance(image, str):
if os.path.exists(image):
pil_img = Image.open(image).convert("RGB")
image = np.array(pil_img)[:, :, [2, 1, 0]]
image_h = pil_img.height
image_w = pil_img.width
else:
raise ValueError(f"invaild image path, {image}")
elif isinstance(image, torch.Tensor):
# pdb.set_trace()
image_tensor = image.cpu()
reverse_norm_mean = torch.tensor(
[0.48145466, 0.4578275, 0.40821073]
)[:, None, None]
reverse_norm_std = torch.tensor(
[0.26862954, 0.26130258, 0.27577711]
)[:, None, None]
image_tensor = (
image_tensor * reverse_norm_std + reverse_norm_mean
)
# pil_img = T.ToPILImage()(image_tensor)
image_h = pil_img.height
image_w = pil_img.width
image = np.array(pil_img)[:, :, [2, 1, 0]]
else:
raise ValueError(
f"invaild image format, {type(image)} for {image}"
)
if len(entities) == 0:
return image
new_image = image.copy()
previous_bboxes = []
# size of text
text_size = 1
# thickness of text
text_line = 1 # int(max(1 * min(image_h, image_w) / 512, 1))
box_line = 3
(c_width, text_height), _ = cv2.getTextSize(
"F", cv2.FONT_HERSHEY_COMPLEX, text_size, text_line
)
base_height = int(text_height * 0.675)
text_offset_original = text_height - base_height
text_spaces = 3
for entity_name, (start, end), bboxes in entities:
for x1_norm, y1_norm, x2_norm, y2_norm in bboxes:
orig_x1, orig_y1, orig_x2, orig_y2 = (
int(x1_norm * image_w),
int(y1_norm * image_h),
int(x2_norm * image_w),
int(y2_norm * image_h),
)
# draw bbox
# random color
color = tuple(np.random.randint(0, 255, size=3).tolist())
new_image = cv2.rectangle(
new_image,
(orig_x1, orig_y1),
(orig_x2, orig_y2),
color,
box_line,
)
l_o, r_o = (
box_line // 2 + box_line % 2,
box_line // 2 + box_line % 2 + 1,
)
x1 = orig_x1 - l_o
y1 = orig_y1 - l_o
if (
y1
< text_height + text_offset_original + 2 * text_spaces
):
y1 = (
orig_y1
+ r_o
+ text_height
+ text_offset_original
+ 2 * text_spaces
)
x1 = orig_x1 + r_o
# add text background
(text_width, text_height), _ = cv2.getTextSize(
f" {entity_name}",
cv2.FONT_HERSHEY_COMPLEX,
text_size,
text_line,
)
text_bg_x1, text_bg_y1, text_bg_x2, text_bg_y2 = (
x1,
y1
- (
text_height
+ text_offset_original
+ 2 * text_spaces
),
x1 + text_width,
y1,
)
for prev_bbox in previous_bboxes:
while is_overlapping(
(
text_bg_x1,
text_bg_y1,
text_bg_x2,
text_bg_y2,
),
prev_bbox,
):
text_bg_y1 += (
text_height
+ text_offset_original
+ 2 * text_spaces
)
text_bg_y2 += (
text_height
+ text_offset_original
+ 2 * text_spaces
)
y1 += (
text_height
+ text_offset_original
+ 2 * text_spaces
)
if text_bg_y2 >= image_h:
text_bg_y1 = max(
0,
image_h
- (
text_height
+ text_offset_original
+ 2 * text_spaces
),
)
text_bg_y2 = image_h
y1 = image_h
break
alpha = 0.5
for i in range(text_bg_y1, text_bg_y2):
for j in range(text_bg_x1, text_bg_x2):
if i < image_h and j < image_w:
if j < text_bg_x1 + 1.35 * c_width:
# original color
bg_color = color
else:
# white
bg_color = [255, 255, 255]
new_image[i, j] = (
alpha * new_image[i, j]
+ (1 - alpha) * np.array(bg_color)
).astype(np.uint8)
cv2.putText(
new_image,
f" {entity_name}",
(x1, y1 - text_offset_original - 1 * text_spaces),
cv2.FONT_HERSHEY_COMPLEX,
text_size,
(0, 0, 0),
text_line,
cv2.LINE_AA,
)
# previous_locations.append((x1, y1))
previous_bboxes.append(
(text_bg_x1, text_bg_y1, text_bg_x2, text_bg_y2)
)
pil_image = Image.fromarray(new_image[:, :, [2, 1, 0]])
if save_path:
pil_image.save(save_path)
if show:
pil_image.show()
return new_image
def generate_boxees(self, task, image_url):
image = self.get_image(image_url)
processed_text, entities = self.process_task(task, image)

@ -333,6 +333,9 @@ class Agent(BaseStructure):
self.planning = planning
self.planning_prompt = planning_prompt
self.device = device
self.custom_planning_prompt = custom_planning_prompt
self.rules = rules
self.custom_tools_prompt = custom_tools_prompt
# Name
self.name = agent_name

@ -0,0 +1,78 @@
from typing import Any, Callable, Dict, List, Optional
from pydantic import BaseModel
class AgentSchemaBaseModel(BaseModel):
id: Optional[str] = None
llm: Optional[Any] = None
template: Optional[str] = None
max_loops: Optional[int] = None
stopping_condition: Optional[Callable[[str], bool]] = None
loop_interval: Optional[int] = None
retry_attempts: Optional[int] = None
retry_interval: Optional[int] = None
return_history: Optional[bool] = False
stopping_token: Optional[str] = None
dynamic_loops: Optional[bool] = False
interactive: Optional[bool] = False
dashboard: Optional[bool] = False
agent_name: Optional[str] = None
agent_description: Optional[str] = None
system_prompt: Optional[str] = None
tools: Optional[List[Callable]] = None
dynamic_temperature_enabled: Optional[bool] = None
sop: Optional[str] = None
sop_list: Optional[List[str]] = None
saved_state_path: Optional[str] = None
autosave: Optional[bool] = True
context_length: Optional[int] = None
user_name: Optional[str] = None
self_healing_enabled: Optional[bool] = None
code_interpreter: Optional[bool] = False
multi_modal: Optional[bool] = False
pdf_path: Optional[str] = None
list_of_pdf: Optional[str] = None
tokenizer: Optional[Any] = None
long_term_memory: Optional[Any] = None
preset_stopping_token: Optional[bool] = False
traceback: Optional[Any] = None
traceback_handlers: Optional[Any] = None
streaming_on: Optional[bool] = False
docs: Optional[List[str]] = None
docs_folder: Optional[str] = None
verbose: Optional[bool] = True
parser: Optional[Callable] = None
best_of_n: Optional[int] = None
callback: Optional[Callable] = None
metadata: Optional[Dict[str, Any]] = None
callbacks: Optional[List[Callable]] = None
logger_handler: Optional[Any] = None
search_algorithm: Optional[Callable] = None
logs_to_filename: Optional[str] = None
evaluator: Optional[Callable] = None
output_json: Optional[bool] = False
stopping_func: Optional[Callable] = None
custom_loop_condition: Optional[Callable] = None
sentiment_threshold: Optional[float] = None
custom_exit_command: Optional[str] = None
sentiment_analyzer: Optional[Callable] = None
limit_tokens_from_string: Optional[Callable] = None
custom_tools_prompt: Optional[Callable] = None
tool_schema: Optional[Any] = None
output_type: Optional[Any] = None
function_calling_type: Optional[str] = None
output_cleaner: Optional[Callable] = None
function_calling_format_type: Optional[str] = None
list_base_models: Optional[List[Any]] = None
metadata_output_type: Optional[str] = None
state_save_file_type: Optional[str] = None
chain_of_thoughts: Optional[bool] = False
algorithm_of_thoughts: Optional[bool] = False
tree_of_thoughts: Optional[bool] = False
tool_choice: Optional[str] = None
execute_tool: Optional[bool] = False
rules: Optional[str] = None
planning: Optional[str] = None
planning_prompt: Optional[str] = None
device: Optional[str] = None
custom_planning_prompt: Optional[str] = None

@ -0,0 +1,11 @@
"""
A loop is a sequence of instructions that is continually repeated until a certain condition is met. In the context of agent-based systems, a loop can be used to control the behavior of an agent, such as how many times it interacts with other agents or how long it runs for.
- Code Example:
loop = Loop(
PlanGeneratorAgent(),
ExecutionAgent(),
max_loops=10,
)
"""

@ -1,17 +1,55 @@
"""
Todo
- [ ] Test the new api feature
- [ ] Add the agent schema for every agent -- following OpenAI assistaants schema
- [ ] then add the swarm schema for the swarm url: /v1/swarms/{swarm_name}/agents/{agent_id}
- [ ] Add the agent schema for the agent url: /v1/swarms/{swarm_name}/agents/{agent_id}
"""
import asyncio
import logging
import multiprocessing
import queue
import threading
from typing import List, Optional
from typing import Dict, List, Optional
# from fastapi import FastAPI
import tenacity
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from swarms.structs.agent import Agent
from swarms.structs.base_structure import BaseStructure
from swarms.utils.logger import logger # noqa: F401
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.loguru_logger import logger
# Pydantic models
class TaskRequest(BaseModel):
task: str
# Pydantic models
class TaskResponse(BaseModel):
result: str
class AgentInfo(BaseModel):
agent_name: str
agent_description: str
class SwarmNetwork(BaseStructure):
class SwarmInfo(BaseModel):
swarm_name: str
swarm_description: str
agents: List[AgentInfo]
# Helper function to get the number of workers
def get_number_of_workers():
return multiprocessing.cpu_count()
# [TODO] Add the agent schema for every agent -- following OpenAI assistaants schema
class SwarmNetwork(BaseSwarm):
"""
SwarmNetwork class
@ -68,15 +106,23 @@ class SwarmNetwork(BaseStructure):
def __init__(
self,
name: str = None,
description: str = None,
agents: List[Agent] = None,
idle_threshold: float = 0.2,
busy_threshold: float = 0.7,
api_enabled: Optional[bool] = False,
logging_enabled: Optional[bool] = False,
api_on: Optional[bool] = False,
host: str = "0.0.0.0",
port: int = 8000,
swarm_callable: Optional[callable] = None,
*args,
**kwargs,
):
super().__init__()
super().__init__(*args, **kwargs)
self.name = name
self.description = description
self.agents = agents
self.task_queue = queue.Queue()
self.idle_threshold = idle_threshold
@ -84,15 +130,37 @@ class SwarmNetwork(BaseStructure):
self.lock = threading.Lock()
self.api_enabled = api_enabled
self.logging_enabled = logging_enabled
self.agent_pool = []
self.host = host
self.port = port
self.swarm_callable = swarm_callable
# Ensure that the agents list is not empty
if not agents:
raise ValueError("The agents list cannot be empty")
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# Create a dictionary of agents for easy access
self.agent_dict = {agent.id: agent for agent in agents}
# For each agent in the pool, run it on it's own thread
if agents is not None:
for agent in agents:
self.agent_pool.append(agent)
self.agents.append(agent)
# Create the FastAPI instance
if api_on is True:
logger.info("Creating FastAPI instance")
self.app = FastAPI(debug=True, *args, **kwargs)
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
logger.info("Routes set for creation")
self._create_routes()
def add_task(self, task):
"""Add task to the task queue
@ -145,6 +213,98 @@ class SwarmNetwork(BaseStructure):
)
raise error
def _create_routes(self) -> None:
"""
Creates the routes for the API.
"""
# Extensive logginbg
logger.info("Creating routes for the API")
# Routes available
logger.info(
"Routes available: /v1/swarms, /v1/health, /v1/swarms/{swarm_name}/agents/{agent_id}, /v1/swarms/{swarm_name}/run"
)
@self.app.get("/v1/swarms", response_model=SwarmInfo)
async def get_swarms() -> SwarmInfo:
try:
logger.info("Getting swarm information")
return SwarmInfo(
swarm_name=self.swarm_name,
swarm_description=self.swarm_description,
agents=[
AgentInfo(
agent_name=agent.agent_name,
agent_description=agent.agent_description,
)
for agent in self.agents
],
)
except Exception as e:
logger.error(f"Error getting swarm information: {str(e)}")
raise HTTPException(
status_code=500, detail="Internal Server Error"
)
@self.app.get("/v1/health")
async def get_health() -> Dict[str, str]:
try:
logger.info("Checking health status")
return {"status": "healthy"}
except Exception as e:
logger.error(f"Error checking health status: {str(e)}")
raise HTTPException(
status_code=500, detail="Internal Server Error"
)
@self.app.get(f"/v1/swarms/{self.swarm_name}/agents/{{agent_id}}")
async def get_agent_info(agent_id: str) -> AgentInfo:
try:
logger.info(f"Getting information for agent {agent_id}")
agent = self.agent_dict.get(agent_id)
if not agent:
raise HTTPException(
status_code=404, detail="Agent not found"
)
return AgentInfo(
agent_name=agent.agent_name,
agent_description=agent.agent_description,
)
except Exception as e:
logger.error(f"Error getting agent information: {str(e)}")
raise HTTPException(
status_code=500, detail="Internal Server Error"
)
@self.app.post(
f"/v1/swarms/{self.swarm_name}/agents/{{agent_id}}/run",
response_model=TaskResponse,
)
async def run_agent_task(
task_request: TaskRequest,
) -> TaskResponse:
try:
logger.info("Running agent task")
# Assuming only one agent in the swarm for this example
agent = self.agents[0]
logger.info(f"Running agent task: {task_request.task}")
result = agent.run(task_request.task)
return TaskResponse(result=result)
except Exception as e:
logger.error(f"Error running agent task: {str(e)}")
raise HTTPException(
status_code=500, detail="Internal Server Error"
)
def get_app(self) -> FastAPI:
"""
Returns the FastAPI instance.
Returns:
FastAPI: The FastAPI instance.
"""
return self.app
def run_single_agent(
self, agent_id, task: Optional[str], *args, **kwargs
):
@ -162,7 +322,7 @@ class SwarmNetwork(BaseStructure):
"""
self.logger.info(f"Running task {task} on agent {agent_id}")
try:
for agent in self.agent_pool:
for agent in self.agents:
if agent.id == agent_id:
out = agent.run(task, *args, **kwargs)
return out
@ -184,8 +344,7 @@ class SwarmNetwork(BaseStructure):
self.logger.info(f"Running task {task} on all agents")
try:
return [
agent.run(task, *args, **kwargs)
for agent in self.agent_pool
agent.run(task, *args, **kwargs) for agent in self.agents
]
except Exception as error:
logger.error(f"Error running task on agents: {error}")
@ -196,8 +355,8 @@ class SwarmNetwork(BaseStructure):
self.logger.info("[Listing all active agents]")
try:
# Assuming self.agent_pool is a list of agent objects
for agent in self.agent_pool:
# Assuming self.agents is a list of agent objects
for agent in self.agents:
self.logger.info(
f"[Agent] [ID: {agent.id}] [Name:"
f" {agent.agent_name}] [Description:"
@ -219,7 +378,7 @@ class SwarmNetwork(BaseStructure):
self.logger.info(f"Getting agent {agent_id}")
try:
for agent in self.agent_pool:
for agent in self.agents:
if agent.id == agent_id:
return agent
raise ValueError(f"No agent found with ID {agent_id}")
@ -235,7 +394,7 @@ class SwarmNetwork(BaseStructure):
"""
self.logger.info(f"Adding agent {agent} to pool")
try:
self.agent_pool.append(agent)
self.agents.append(agent)
except Exception as error:
print(f"Error adding agent to pool: {error}")
raise error
@ -248,9 +407,9 @@ class SwarmNetwork(BaseStructure):
"""
self.logger.info(f"Removing agent {agent_id} from pool")
try:
for agent in self.agent_pool:
for agent in self.agents:
if agent.id == agent_id:
self.agent_pool.remove(agent)
self.agents.remove(agent)
return
raise ValueError(f"No agent found with ID {agent_id}")
except Exception as error:
@ -281,7 +440,7 @@ class SwarmNetwork(BaseStructure):
self.logger.info(f"Scaling up agent pool by {num_agents}")
try:
for _ in range(num_agents):
self.agent_pool.append(Agent())
self.agents.append(Agent())
except Exception as error:
print(f"Error scaling up agent pool: {error}")
raise error
@ -293,32 +452,56 @@ class SwarmNetwork(BaseStructure):
num_agents (int, optional): _description_. Defaults to 1.
"""
for _ in range(num_agents):
self.agent_pool.pop()
self.agents.pop()
@tenacity.retry(
wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_attempt(3),
retry=tenacity.retry_if_exception_type(Exception),
)
def run(self, *args, **kwargs):
"""run the swarm network"""
app = self.get_app()
# - Create APIs for each agent in the pool (optional) with fastapi
def create_apis_for_agents(self):
"""Create APIs for each agent in the pool (optional) with fastapi
try:
import uvicorn
Returns:
_type_: _description_
"""
self.apis = []
for agent in self.agent_pool:
self.api.get(f"/{agent.id}")
logger.info(
f"Running the swarm network with {len(self.agents)} on {self.host}:{self.port}"
)
uvicorn.run(
app,
host=self.host,
port=self.port,
# workers=get_number_of_workers(),
*args,
**kwargs,
)
def run_agent(task: str, *args, **kwargs):
return agent.run(task, *args, **kwargs)
return app
except Exception as error:
logger.error(f"Error running the swarm network: {error}")
raise error
self.apis.append(self.api)
def run(self):
"""run the swarm network"""
# Observe all agents in the pool
self.logger.info("Starting the SwarmNetwork")
for agent in self.agent_pool:
self.logger.info(f"Starting agent {agent.id}")
self.logger.info(
f"[Agent][{agent.id}] [Status] [Running] [Awaiting"
" Task]"
)
# # # Example usage
# if __name__ == "__main__":
# agent1 = Agent(
# agent_name="Covid-19-Chat",
# agent_description="This agent provides information about COVID-19 symptoms.",
# llm=OpenAIChat(),
# max_loops="auto",
# autosave=True,
# verbose=True,
# stopping_condition="finish",
# )
# agents = [agent1] # Add more agents as needed
# swarm_name = "HealthSwarm"
# swarm_description = (
# "A swarm of agents providing health-related information."
# )
# agent_api = SwarmNetwork(swarm_name, swarm_description, agents)
# agent_api.run()

Loading…
Cancel
Save