From 63236dbee36d803a29a4052f22a252c705813b8c Mon Sep 17 00:00:00 2001 From: Kye Date: Tue, 26 Dec 2023 12:40:11 -0500 Subject: [PATCH] [FEAT][SimpleAgent] --- .gitignore | 1 + README.md | 7 +- playground/agents/simple_agent.py | 16 +- swarms/agents/simple_agent.py | 39 ++++ swarms/models/sam.py | 364 +++++++----------------------- 5 files changed, 132 insertions(+), 295 deletions(-) create mode 100644 swarms/agents/simple_agent.py diff --git a/.gitignore b/.gitignore index 93f8e5c0..ac6be257 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,7 @@ venv swarms/agents/.DS_Store _build +conversation.txt stderr_log.txt .vscode diff --git a/README.md b/README.md index fe4eac4b..3de9b66c 100644 --- a/README.md +++ b/README.md @@ -464,6 +464,7 @@ print(video_path) - Plug in and play conversational agent with `GPT4`, `Mixytral`, or any of our models - Reliable conversational structure to hold messages together with dynamic handling for long context conversations and interactions with auto chunking - Reliable, this simple system will always provide responses you want. + ```python import os @@ -474,7 +475,9 @@ from swarms import ( Conversation, ) -conv = Conversation() +conv = Conversation( + time_enabled=True, +) # Load the environment variables load_dotenv() @@ -499,7 +502,7 @@ def interactive_conversation(llm): out = llm(task) conv.add("assistant", out) print( - f"Assistant: {out}", #color="cyan" + f"Assistant: {out}", ) conv.display_conversation() conv.export_conversation("conversation.txt") diff --git a/playground/agents/simple_agent.py b/playground/agents/simple_agent.py index 934a5298..dd46083b 100644 --- a/playground/agents/simple_agent.py +++ b/playground/agents/simple_agent.py @@ -5,10 +5,11 @@ from dotenv import load_dotenv from swarms import ( OpenAIChat, Conversation, - # display_markdown_message, ) -conv = Conversation() +conv = Conversation( + time_enabled=True, +) # Load the environment variables load_dotenv() @@ -19,10 +20,11 @@ api_key = os.environ.get("OPENAI_API_KEY") # Initialize the language model llm = OpenAIChat(openai_api_key=api_key, model_name="gpt-4") + # Run the language model in a loop -def interactive_conversation(llm): +def interactive_conversation(llm, iters: int = 10): conv = Conversation() - while True: + for i in range(iters): user_input = input("User: ") conv.add("user", user_input) if user_input.lower() == "quit": @@ -33,10 +35,10 @@ def interactive_conversation(llm): out = llm(task) conv.add("assistant", out) print( - f"Assistant: {out}", #color="cyan" + f"Assistant: {out}", ) - conv.display_conversation() - conv.export_conversation("conversation.txt") + conv.display_conversation() + conv.export_conversation("conversation.txt") # Replace with your LLM instance diff --git a/swarms/agents/simple_agent.py b/swarms/agents/simple_agent.py new file mode 100644 index 00000000..3e4a65ae --- /dev/null +++ b/swarms/agents/simple_agent.py @@ -0,0 +1,39 @@ +from swarms import Conversation, AbstractLLM + + +# Run the language model in a loop for n iterations +def SimpleAgent( + llm: AbstractLLM = None, iters: int = 10, *args, **kwargs +): + """Simple agent conversation + + Args: + llm (_type_): _description_ + iters (int, optional): _description_. Defaults to 10. + """ + try: + conv = Conversation(*args, **kwargs) + for i in range(iters): + user_input = input("User: ") + conv.add("user", user_input) + if user_input.lower() == "quit": + break + task = ( + conv.return_history_as_string() + ) # Get the conversation history + out = llm(task) + conv.add("assistant", out) + print( + f"Assistant: {out}", + ) + conv.display_conversation() + conv.export_conversation("conversation.txt") + + except Exception as error: + print(f"[ERROR][SimpleAgentConversation] {error}") + raise error + + except KeyboardInterrupt: + print("[INFO][SimpleAgentConversation] Keyboard interrupt") + conv.export_conversation("conversation.txt") + raise KeyboardInterrupt diff --git a/swarms/models/sam.py b/swarms/models/sam.py index 866c79ee..110d80b7 100644 --- a/swarms/models/sam.py +++ b/swarms/models/sam.py @@ -1,315 +1,107 @@ -import cv2 -import numpy as np +import torch from PIL import Image -from transformers import ( - SamImageProcessor, - SamModel, - SamProcessor, - pipeline, -) +import requests +from transformers import SamModel, SamProcessor +from typing import List -try: - import cv2 - import supervision as sv -except ImportError: - print("Please install supervision and cv") +device = "cuda" if torch.cuda.is_available() else "cpu" -from enum import Enum - - -class FeatureType(Enum): - """ - An enumeration to represent the types of features for mask adjustment in image - segmentation. - """ - - ISLAND = "ISLAND" - HOLE = "HOLE" - - @classmethod - def list(cls): - return list(map(lambda c: c.value, cls)) - - -def compute_mask_iou_vectorized(masks: np.ndarray) -> np.ndarray: - """ - Vectorized computation of the Intersection over Union (IoU) for all pairs of masks. - - Parameters: - masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the - number of masks, `H` is the height, and `W` is the width. - - Returns: - np.ndarray: A 2D numpy array of shape `(N, N)` where each element `[i, j]` is - the IoU between masks `i` and `j`. - - Raises: - ValueError: If any of the masks is found to be empty. - """ - if np.any(masks.sum(axis=(1, 2)) == 0): - raise ValueError( - "One or more masks are empty. Please filter out empty" - " masks before using `compute_iou_vectorized` function." - ) - - masks_bool = masks.astype(bool) - masks_flat = masks_bool.reshape(masks.shape[0], -1) - intersection = np.logical_and( - masks_flat[:, None], masks_flat[None, :] - ).sum(axis=2) - union = np.logical_or( - masks_flat[:, None], masks_flat[None, :] - ).sum(axis=2) - iou_matrix = intersection / union - return iou_matrix - - -def mask_non_max_suppression( - masks: np.ndarray, iou_threshold: float = 0.6 -) -> np.ndarray: +class SAM: """ - Performs Non-Max Suppression on a set of masks by prioritizing larger masks and - removing smaller masks that overlap significantly. + Class representing the SAM (Segmentation and Masking) model. - When the IoU between two masks exceeds the specified threshold, the smaller mask - (in terms of area) is discarded. This process is repeated for each pair of masks, - effectively filtering out masks that are significantly overlapped by larger ones. + Args: + model_name (str): The name of the pre-trained SAM model. Default is "facebook/sam-vit-huge". + device (torch.device): The device to run the model on. Default is the current device. + input_points (List[List[int]]): The 2D location of a window in the image to segment. Default is [[450, 600]]. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. - Parameters: - masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the - number of masks, `H` is the height, and `W` is the width. - iou_threshold (float): The IoU threshold for determining significant overlap. + Attributes: + model_name (str): The name of the pre-trained SAM model. + device (torch.device): The device to run the model on. + input_points (List[List[int]]): The 2D location of a window in the image to segment. + model (SamModel): The pre-trained SAM model. + processor (SamProcessor): The processor for the SAM model. - Returns: - np.ndarray: A 3D numpy array of filtered masks. - """ - num_masks = masks.shape[0] - areas = masks.sum(axis=(1, 2)) - sorted_idx = np.argsort(-areas) - keep_mask = np.ones(num_masks, dtype=bool) - iou_matrix = compute_mask_iou_vectorized(masks) - for i in range(num_masks): - if not keep_mask[sorted_idx[i]]: - continue - - overlapping_masks = iou_matrix[sorted_idx[i]] > iou_threshold - overlapping_masks[sorted_idx[i]] = False - keep_mask[sorted_idx] = np.logical_and( - keep_mask[sorted_idx], ~overlapping_masks - ) - - return masks[keep_mask] - - -def filter_masks_by_relative_area( - masks: np.ndarray, - minimum_area: float = 0.01, - maximum_area: float = 1.0, -) -> np.ndarray: - """ - Filters masks based on their relative area within the total area of each mask. - - Parameters: - masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the - number of masks, `H` is the height, and `W` is the width. - minimum_area (float): The minimum relative area threshold. Must be between `0` - and `1`. - maximum_area (float): The maximum relative area threshold. Must be between `0` - and `1`. - - Returns: - np.ndarray: A 3D numpy array containing masks that fall within the specified - relative area range. + Methods: + run(task=None, img=None, *args, **kwargs): Runs the SAM model on the given image and returns the segmentation scores and masks. + process_img(img: str = None, *args, **kwargs): Processes the input image and returns the processed image. - Raises: - ValueError: If `minimum_area` or `maximum_area` are outside the `0` to `1` - range, or if `minimum_area` is greater than `maximum_area`. """ - if not (isinstance(masks, np.ndarray) and masks.ndim == 3): - raise ValueError("Input must be a 3D numpy array.") - - if not (0 <= minimum_area <= 1) or not (0 <= maximum_area <= 1): - raise ValueError( - "`minimum_area` and `maximum_area` must be between 0" - " and 1." - ) - - if minimum_area > maximum_area: - raise ValueError( - "`minimum_area` must be less than or equal to" - " `maximum_area`." - ) - - total_area = masks.shape[1] * masks.shape[2] - relative_areas = masks.sum(axis=(1, 2)) / total_area - return masks[ - (relative_areas >= minimum_area) - & (relative_areas <= maximum_area) - ] - - -def adjust_mask_features_by_relative_area( - mask: np.ndarray, - area_threshold: float, - feature_type: FeatureType = FeatureType.ISLAND, -) -> np.ndarray: - """ - Adjusts a mask by removing small islands or filling small holes based on a relative - area threshold. - - !!! warning - - Running this function on a mask with small islands may result in empty masks. - - Parameters: - mask (np.ndarray): A 2D numpy array with shape `(H, W)`, where `H` is the - height, and `W` is the width. - area_threshold (float): Threshold for relative area to remove or fill features. - feature_type (FeatureType): Type of feature to adjust (`ISLAND` for removing - islands, `HOLE` for filling holes). - - Returns: - np.ndarray: A 2D numpy array containing mask. - """ - height, width = mask.shape - total_area = width * height + def __init__( + self, + model_name: str = "facebook/sam-vit-huge", + device=device, + input_points: List[List[int]] = [[450, 600]], + *args, + **kwargs, + ): + self.model_name = model_name + self.device = device + self.input_points = input_points - mask = np.uint8(mask * 255) - operation = ( - cv2.RETR_EXTERNAL - if feature_type == FeatureType.ISLAND - else cv2.RETR_CCOMP - ) - contours, _ = cv2.findContours( - mask, operation, cv2.CHAIN_APPROX_SIMPLE - ) + self.model = SamModel.from_pretrained( + model_name, *args, **kwargs + ).to(device) - for contour in contours: - area = cv2.contourArea(contour) - relative_area = area / total_area - if relative_area < area_threshold: - cv2.drawContours( - image=mask, - contours=[contour], - contourIdx=-1, - color=( - 0 if feature_type == FeatureType.ISLAND else 255 - ), - thickness=-1, - ) - return np.where(mask > 0, 1, 0).astype(bool) + self.processor = SamProcessor.from_pretrained(model_name) + def run(self, task=None, img=None, *args, **kwargs): + """ + Runs the SAM model on the given image and returns the segmentation scores and masks. -def masks_to_marks(masks: np.ndarray) -> sv.Detections: - """ - Converts a set of masks to a marks (sv.Detections) object. + Args: + task: The task to perform. Not used in this method. + img: The input image to segment. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. - Parameters: - masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the - number of masks, `H` is the height, and `W` is the width. + Returns: + Tuple: A tuple containing the segmentation scores and masks. - Returns: - sv.Detections: An object containing the masks and their bounding box - coordinates. - """ - return sv.Detections( - mask=masks, xyxy=sv.mask_to_xyxy(masks=masks) - ) + """ + img = self.process_img(img) + # Specify the points of the mask to segment + input_points = [ + self.input_points + ] # 2D location of a window in the image -def refine_marks( - marks: sv.Detections, - maximum_hole_area: float = 0.01, - maximum_island_area: float = 0.01, - minimum_mask_area: float = 0.02, - maximum_mask_area: float = 1.0, -) -> sv.Detections: - """ - Refines a set of masks by removing small islands and holes, and filtering by mask - area. + # Preprocess the image + inputs = self.processor( + img, input_points=input_points, return_tensors="pt" + ).to(device) - Parameters: - marks (sv.Detections): An object containing the masks and their bounding box - coordinates. - maximum_hole_area (float): The maximum relative area of holes to be filled in - each mask. - maximum_island_area (float): The maximum relative area of islands to be removed - from each mask. - minimum_mask_area (float): The minimum relative area for a mask to be retained. - maximum_mask_area (float): The maximum relative area for a mask to be retained. + with torch.no_grad(): + outputs = self.model(**inputs) # noqa: E999 - Returns: - sv.Detections: An object containing the masks and their bounding box - coordinates. - """ - result_masks = [] - for mask in marks.mask: - mask = adjust_mask_features_by_relative_area( - mask=mask, - area_threshold=maximum_island_area, - feature_type=FeatureType.ISLAND, + masks = self.processor.image_processor.post_process_masks( + outputs.pred_masks.cpu(), + inputs["original_sizes"].cpu(), + inputs["reshaped_input_sizes"].cpu(), ) - mask = adjust_mask_features_by_relative_area( - mask=mask, - area_threshold=maximum_hole_area, - feature_type=FeatureType.HOLE, - ) - if np.any(mask): - result_masks.append(mask) - result_masks = np.array(result_masks) - result_masks = filter_masks_by_relative_area( - masks=result_masks, - minimum_area=minimum_mask_area, - maximum_area=maximum_mask_area, - ) - return sv.Detections( - mask=result_masks, xyxy=sv.mask_to_xyxy(masks=result_masks) - ) - - -class SegmentAnythingMarkGenerator: - """ - A class for performing image segmentation using a specified model. + scores = outputs.iou_scores - Parameters: - device (str): The device to run the model on (e.g., 'cpu', 'cuda'). - model_name (str): The name of the model to be loaded. Defaults to - 'facebook/sam-vit-huge'. - """ - - def __init__( - self, - device: str = "cpu", - model_name: str = "facebook/sam-vit-huge", - ): - self.model = SamModel.from_pretrained(model_name).to(device) - self.processor = SamProcessor.from_pretrained(model_name) - self.image_processor = SamImageProcessor.from_pretrained( - model_name - ) - self.pipeline = pipeline( - task="mask-generation", - model=self.model, - image_processor=self.image_processor, - device=device, - ) + return scores, masks - def run(self, image: np.ndarray) -> sv.Detections: + def process_img(self, img: str = None, *args, **kwargs): """ - Generate image segmentation marks. + Processes the input image and returns the processed image. - Parameters: - image (np.ndarray): The image to be marked in BGR format. + Args: + img (str): The URL or file path of the input image. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. Returns: - sv.Detections: An object containing the segmentation masks and their - corresponding bounding box coordinates. + Image: The processed image. + """ - image = Image.fromarray( - cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - ) - outputs = self.pipeline(image, points_per_batch=64) - masks = np.array(outputs["masks"]) - return masks_to_marks(masks=masks) + raw_image = Image.open( + requests.get(img, stream=True, *args, **kwargs).raw + ).convert("RGB") + + return raw_image