[FEAT][SimpleAgent]

pull/336/head
Kye 1 year ago
parent 36b022ed41
commit 63236dbee3

1
.gitignore vendored

@ -18,6 +18,7 @@ venv
swarms/agents/.DS_Store swarms/agents/.DS_Store
_build _build
conversation.txt
stderr_log.txt stderr_log.txt
.vscode .vscode

@ -464,6 +464,7 @@ print(video_path)
- Plug in and play conversational agent with `GPT4`, `Mixytral`, or any of our models - Plug in and play conversational agent with `GPT4`, `Mixytral`, or any of our models
- Reliable conversational structure to hold messages together with dynamic handling for long context conversations and interactions with auto chunking - Reliable conversational structure to hold messages together with dynamic handling for long context conversations and interactions with auto chunking
- Reliable, this simple system will always provide responses you want. - Reliable, this simple system will always provide responses you want.
```python ```python
import os import os
@ -474,7 +475,9 @@ from swarms import (
Conversation, Conversation,
) )
conv = Conversation() conv = Conversation(
time_enabled=True,
)
# Load the environment variables # Load the environment variables
load_dotenv() load_dotenv()
@ -499,7 +502,7 @@ def interactive_conversation(llm):
out = llm(task) out = llm(task)
conv.add("assistant", out) conv.add("assistant", out)
print( print(
f"Assistant: {out}", #color="cyan" f"Assistant: {out}",
) )
conv.display_conversation() conv.display_conversation()
conv.export_conversation("conversation.txt") conv.export_conversation("conversation.txt")

@ -5,10 +5,11 @@ from dotenv import load_dotenv
from swarms import ( from swarms import (
OpenAIChat, OpenAIChat,
Conversation, Conversation,
# display_markdown_message,
) )
conv = Conversation() conv = Conversation(
time_enabled=True,
)
# Load the environment variables # Load the environment variables
load_dotenv() load_dotenv()
@ -19,10 +20,11 @@ api_key = os.environ.get("OPENAI_API_KEY")
# Initialize the language model # Initialize the language model
llm = OpenAIChat(openai_api_key=api_key, model_name="gpt-4") llm = OpenAIChat(openai_api_key=api_key, model_name="gpt-4")
# Run the language model in a loop # Run the language model in a loop
def interactive_conversation(llm): def interactive_conversation(llm, iters: int = 10):
conv = Conversation() conv = Conversation()
while True: for i in range(iters):
user_input = input("User: ") user_input = input("User: ")
conv.add("user", user_input) conv.add("user", user_input)
if user_input.lower() == "quit": if user_input.lower() == "quit":
@ -33,7 +35,7 @@ def interactive_conversation(llm):
out = llm(task) out = llm(task)
conv.add("assistant", out) conv.add("assistant", out)
print( print(
f"Assistant: {out}", #color="cyan" f"Assistant: {out}",
) )
conv.display_conversation() conv.display_conversation()
conv.export_conversation("conversation.txt") conv.export_conversation("conversation.txt")

@ -0,0 +1,39 @@
from swarms import Conversation, AbstractLLM
# Run the language model in a loop for n iterations
def SimpleAgent(
llm: AbstractLLM = None, iters: int = 10, *args, **kwargs
):
"""Simple agent conversation
Args:
llm (_type_): _description_
iters (int, optional): _description_. Defaults to 10.
"""
try:
conv = Conversation(*args, **kwargs)
for i in range(iters):
user_input = input("User: ")
conv.add("user", user_input)
if user_input.lower() == "quit":
break
task = (
conv.return_history_as_string()
) # Get the conversation history
out = llm(task)
conv.add("assistant", out)
print(
f"Assistant: {out}",
)
conv.display_conversation()
conv.export_conversation("conversation.txt")
except Exception as error:
print(f"[ERROR][SimpleAgentConversation] {error}")
raise error
except KeyboardInterrupt:
print("[INFO][SimpleAgentConversation] Keyboard interrupt")
conv.export_conversation("conversation.txt")
raise KeyboardInterrupt

@ -1,315 +1,107 @@
import cv2 import torch
import numpy as np
from PIL import Image from PIL import Image
from transformers import ( import requests
SamImageProcessor, from transformers import SamModel, SamProcessor
SamModel, from typing import List
SamProcessor,
pipeline,
)
try: device = "cuda" if torch.cuda.is_available() else "cpu"
import cv2
import supervision as sv
except ImportError:
print("Please install supervision and cv")
from enum import Enum class SAM:
class FeatureType(Enum):
"""
An enumeration to represent the types of features for mask adjustment in image
segmentation.
""" """
Class representing the SAM (Segmentation and Masking) model.
ISLAND = "ISLAND" Args:
HOLE = "HOLE" model_name (str): The name of the pre-trained SAM model. Default is "facebook/sam-vit-huge".
device (torch.device): The device to run the model on. Default is the current device.
input_points (List[List[int]]): The 2D location of a window in the image to segment. Default is [[450, 600]].
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
@classmethod Attributes:
def list(cls): model_name (str): The name of the pre-trained SAM model.
return list(map(lambda c: c.value, cls)) device (torch.device): The device to run the model on.
input_points (List[List[int]]): The 2D location of a window in the image to segment.
model (SamModel): The pre-trained SAM model.
processor (SamProcessor): The processor for the SAM model.
Methods:
run(task=None, img=None, *args, **kwargs): Runs the SAM model on the given image and returns the segmentation scores and masks.
process_img(img: str = None, *args, **kwargs): Processes the input image and returns the processed image.
def compute_mask_iou_vectorized(masks: np.ndarray) -> np.ndarray:
""" """
Vectorized computation of the Intersection over Union (IoU) for all pairs of masks.
Parameters: def __init__(
masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the self,
number of masks, `H` is the height, and `W` is the width. model_name: str = "facebook/sam-vit-huge",
device=device,
Returns: input_points: List[List[int]] = [[450, 600]],
np.ndarray: A 2D numpy array of shape `(N, N)` where each element `[i, j]` is *args,
the IoU between masks `i` and `j`. **kwargs,
):
Raises: self.model_name = model_name
ValueError: If any of the masks is found to be empty. self.device = device
""" self.input_points = input_points
if np.any(masks.sum(axis=(1, 2)) == 0):
raise ValueError(
"One or more masks are empty. Please filter out empty"
" masks before using `compute_iou_vectorized` function."
)
masks_bool = masks.astype(bool) self.model = SamModel.from_pretrained(
masks_flat = masks_bool.reshape(masks.shape[0], -1) model_name, *args, **kwargs
intersection = np.logical_and( ).to(device)
masks_flat[:, None], masks_flat[None, :]
).sum(axis=2)
union = np.logical_or(
masks_flat[:, None], masks_flat[None, :]
).sum(axis=2)
iou_matrix = intersection / union
return iou_matrix
self.processor = SamProcessor.from_pretrained(model_name)
def mask_non_max_suppression( def run(self, task=None, img=None, *args, **kwargs):
masks: np.ndarray, iou_threshold: float = 0.6
) -> np.ndarray:
""" """
Performs Non-Max Suppression on a set of masks by prioritizing larger masks and Runs the SAM model on the given image and returns the segmentation scores and masks.
removing smaller masks that overlap significantly.
When the IoU between two masks exceeds the specified threshold, the smaller mask Args:
(in terms of area) is discarded. This process is repeated for each pair of masks, task: The task to perform. Not used in this method.
effectively filtering out masks that are significantly overlapped by larger ones. img: The input image to segment.
*args: Additional positional arguments.
Parameters: **kwargs: Additional keyword arguments.
masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the
number of masks, `H` is the height, and `W` is the width.
iou_threshold (float): The IoU threshold for determining significant overlap.
Returns: Returns:
np.ndarray: A 3D numpy array of filtered masks. Tuple: A tuple containing the segmentation scores and masks.
"""
num_masks = masks.shape[0]
areas = masks.sum(axis=(1, 2))
sorted_idx = np.argsort(-areas)
keep_mask = np.ones(num_masks, dtype=bool)
iou_matrix = compute_mask_iou_vectorized(masks)
for i in range(num_masks):
if not keep_mask[sorted_idx[i]]:
continue
overlapping_masks = iou_matrix[sorted_idx[i]] > iou_threshold
overlapping_masks[sorted_idx[i]] = False
keep_mask[sorted_idx] = np.logical_and(
keep_mask[sorted_idx], ~overlapping_masks
)
return masks[keep_mask]
def filter_masks_by_relative_area(
masks: np.ndarray,
minimum_area: float = 0.01,
maximum_area: float = 1.0,
) -> np.ndarray:
""" """
Filters masks based on their relative area within the total area of each mask. img = self.process_img(img)
Parameters: # Specify the points of the mask to segment
masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the input_points = [
number of masks, `H` is the height, and `W` is the width. self.input_points
minimum_area (float): The minimum relative area threshold. Must be between `0` ] # 2D location of a window in the image
and `1`.
maximum_area (float): The maximum relative area threshold. Must be between `0`
and `1`.
Returns: # Preprocess the image
np.ndarray: A 3D numpy array containing masks that fall within the specified inputs = self.processor(
relative area range. img, input_points=input_points, return_tensors="pt"
).to(device)
Raises:
ValueError: If `minimum_area` or `maximum_area` are outside the `0` to `1`
range, or if `minimum_area` is greater than `maximum_area`.
"""
if not (isinstance(masks, np.ndarray) and masks.ndim == 3): with torch.no_grad():
raise ValueError("Input must be a 3D numpy array.") outputs = self.model(**inputs) # noqa: E999
if not (0 <= minimum_area <= 1) or not (0 <= maximum_area <= 1): masks = self.processor.image_processor.post_process_masks(
raise ValueError( outputs.pred_masks.cpu(),
"`minimum_area` and `maximum_area` must be between 0" inputs["original_sizes"].cpu(),
" and 1." inputs["reshaped_input_sizes"].cpu(),
) )
scores = outputs.iou_scores
if minimum_area > maximum_area: return scores, masks
raise ValueError(
"`minimum_area` must be less than or equal to"
" `maximum_area`."
)
total_area = masks.shape[1] * masks.shape[2]
relative_areas = masks.sum(axis=(1, 2)) / total_area
return masks[
(relative_areas >= minimum_area)
& (relative_areas <= maximum_area)
]
def process_img(self, img: str = None, *args, **kwargs):
def adjust_mask_features_by_relative_area(
mask: np.ndarray,
area_threshold: float,
feature_type: FeatureType = FeatureType.ISLAND,
) -> np.ndarray:
""" """
Adjusts a mask by removing small islands or filling small holes based on a relative Processes the input image and returns the processed image.
area threshold.
!!! warning
Running this function on a mask with small islands may result in empty masks. Args:
img (str): The URL or file path of the input image.
Parameters: *args: Additional positional arguments.
mask (np.ndarray): A 2D numpy array with shape `(H, W)`, where `H` is the **kwargs: Additional keyword arguments.
height, and `W` is the width.
area_threshold (float): Threshold for relative area to remove or fill features.
feature_type (FeatureType): Type of feature to adjust (`ISLAND` for removing
islands, `HOLE` for filling holes).
Returns: Returns:
np.ndarray: A 2D numpy array containing mask. Image: The processed image.
"""
height, width = mask.shape
total_area = width * height
mask = np.uint8(mask * 255)
operation = (
cv2.RETR_EXTERNAL
if feature_type == FeatureType.ISLAND
else cv2.RETR_CCOMP
)
contours, _ = cv2.findContours(
mask, operation, cv2.CHAIN_APPROX_SIMPLE
)
for contour in contours:
area = cv2.contourArea(contour)
relative_area = area / total_area
if relative_area < area_threshold:
cv2.drawContours(
image=mask,
contours=[contour],
contourIdx=-1,
color=(
0 if feature_type == FeatureType.ISLAND else 255
),
thickness=-1,
)
return np.where(mask > 0, 1, 0).astype(bool)
def masks_to_marks(masks: np.ndarray) -> sv.Detections:
""" """
Converts a set of masks to a marks (sv.Detections) object. raw_image = Image.open(
requests.get(img, stream=True, *args, **kwargs).raw
Parameters: ).convert("RGB")
masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the
number of masks, `H` is the height, and `W` is the width.
Returns: return raw_image
sv.Detections: An object containing the masks and their bounding box
coordinates.
"""
return sv.Detections(
mask=masks, xyxy=sv.mask_to_xyxy(masks=masks)
)
def refine_marks(
marks: sv.Detections,
maximum_hole_area: float = 0.01,
maximum_island_area: float = 0.01,
minimum_mask_area: float = 0.02,
maximum_mask_area: float = 1.0,
) -> sv.Detections:
"""
Refines a set of masks by removing small islands and holes, and filtering by mask
area.
Parameters:
marks (sv.Detections): An object containing the masks and their bounding box
coordinates.
maximum_hole_area (float): The maximum relative area of holes to be filled in
each mask.
maximum_island_area (float): The maximum relative area of islands to be removed
from each mask.
minimum_mask_area (float): The minimum relative area for a mask to be retained.
maximum_mask_area (float): The maximum relative area for a mask to be retained.
Returns:
sv.Detections: An object containing the masks and their bounding box
coordinates.
"""
result_masks = []
for mask in marks.mask:
mask = adjust_mask_features_by_relative_area(
mask=mask,
area_threshold=maximum_island_area,
feature_type=FeatureType.ISLAND,
)
mask = adjust_mask_features_by_relative_area(
mask=mask,
area_threshold=maximum_hole_area,
feature_type=FeatureType.HOLE,
)
if np.any(mask):
result_masks.append(mask)
result_masks = np.array(result_masks)
result_masks = filter_masks_by_relative_area(
masks=result_masks,
minimum_area=minimum_mask_area,
maximum_area=maximum_mask_area,
)
return sv.Detections(
mask=result_masks, xyxy=sv.mask_to_xyxy(masks=result_masks)
)
class SegmentAnythingMarkGenerator:
"""
A class for performing image segmentation using a specified model.
Parameters:
device (str): The device to run the model on (e.g., 'cpu', 'cuda').
model_name (str): The name of the model to be loaded. Defaults to
'facebook/sam-vit-huge'.
"""
def __init__(
self,
device: str = "cpu",
model_name: str = "facebook/sam-vit-huge",
):
self.model = SamModel.from_pretrained(model_name).to(device)
self.processor = SamProcessor.from_pretrained(model_name)
self.image_processor = SamImageProcessor.from_pretrained(
model_name
)
self.pipeline = pipeline(
task="mask-generation",
model=self.model,
image_processor=self.image_processor,
device=device,
)
def run(self, image: np.ndarray) -> sv.Detections:
"""
Generate image segmentation marks.
Parameters:
image (np.ndarray): The image to be marked in BGR format.
Returns:
sv.Detections: An object containing the segmentation masks and their
corresponding bounding box coordinates.
"""
image = Image.fromarray(
cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
)
outputs = self.pipeline(image, points_per_batch=64)
masks = np.array(outputs["masks"])
return masks_to_marks(masks=masks)

Loading…
Cancel
Save