[FEAT][Tool prompts segmentation][FEAT][SAM][FEAT][Task, tasks tests][GPT4VisionAPI optimization]

pull/210/head
Kye 1 year ago
parent df59d3642e
commit a443666501

@ -69,6 +69,10 @@ class BaseMultiModalModel:
device: Optional[str] = "cuda", device: Optional[str] = "cuda",
max_new_tokens: Optional[int] = 500, max_new_tokens: Optional[int] = 500,
retries: Optional[int] = 3, retries: Optional[int] = 3,
system_prompt: Optional[str] = None,
meta_prompt: Optional[str] = None,
*args,
**kwargs,
): ):
self.model_name = model_name self.model_name = model_name
self.temperature = temperature self.temperature = temperature
@ -265,3 +269,17 @@ class BaseMultiModalModel:
""" """
for chunk in content: for chunk in content:
print(chunk) print(chunk)
def meta_prompt(self):
"""Meta Prompt
Returns:
_type_: _description_
"""
META_PROMPT = """
For any labels or markings on an image that you reference in your response, please
enclose them in square brackets ([]) and list them explicitly. Do not use ranges; for
example, instead of '1 - 4', list as '[1], [2], [3], [4]'. These labels could be
numbers or letters and typically correspond to specific segments or parts of the image.
"""
return META_PROMPT

@ -67,6 +67,10 @@ class GPT4VisionAPI:
openai_proxy: str = "https://api.openai.com/v1/chat/completions", openai_proxy: str = "https://api.openai.com/v1/chat/completions",
beautify: bool = False, beautify: bool = False,
streaming_enabled: Optional[bool] = False, streaming_enabled: Optional[bool] = False,
meta_prompt: Optional[bool] = None,
system_prompt: Optional[str] = None,
*args,
**kwargs,
): ):
super().__init__() super().__init__()
self.openai_api_key = openai_api_key self.openai_api_key = openai_api_key
@ -77,6 +81,8 @@ class GPT4VisionAPI:
self.openai_proxy = openai_proxy self.openai_proxy = openai_proxy
self.beautify = beautify self.beautify = beautify
self.streaming_enabled = streaming_enabled self.streaming_enabled = streaming_enabled
self.meta_prompt = meta_prompt
self.system_prompt = system_prompt
if self.logging_enabled: if self.logging_enabled:
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
@ -85,6 +91,9 @@ class GPT4VisionAPI:
logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING)
if self.meta_prompt:
self.system_prompt = self.meta_prompt_init()
def encode_image(self, img: str): def encode_image(self, img: str):
"""Encode image to base64.""" """Encode image to base64."""
with open(img, "rb") as image_file: with open(img, "rb") as image_file:
@ -112,6 +121,7 @@ class GPT4VisionAPI:
payload = { payload = {
"model": "gpt-4-vision-preview", "model": "gpt-4-vision-preview",
"messages": [ "messages": [
{"role": "system", "content": [self.system_prompt]},
{ {
"role": "user", "role": "user",
"content": [ "content": [
@ -125,7 +135,7 @@ class GPT4VisionAPI:
}, },
}, },
], ],
} },
], ],
"max_tokens": self.max_tokens, "max_tokens": self.max_tokens,
} }
@ -244,6 +254,7 @@ class GPT4VisionAPI:
payload = { payload = {
"model": "gpt-4-vision-preview", "model": "gpt-4-vision-preview",
"messages": [ "messages": [
{"role": "system", "content": [self.system_prompt]},
{ {
"role": "user", "role": "user",
"content": [ "content": [
@ -257,7 +268,7 @@ class GPT4VisionAPI:
}, },
}, },
], ],
} },
], ],
"max_tokens": self.max_tokens, "max_tokens": self.max_tokens,
} }
@ -425,3 +436,17 @@ class GPT4VisionAPI:
) )
) )
return dashboard return dashboard
def meta_prompt_init(self):
"""Meta Prompt
Returns:
_type_: _description_
"""
META_PROMPT = """
For any labels or markings on an image that you reference in your response, please
enclose them in square brackets ([]) and list them explicitly. Do not use ranges; for
example, instead of '1 - 4', list as '[1], [2], [3], [4]'. These labels could be
numbers or letters and typically correspond to specific segments or parts of the image.
"""
return META_PROMPT

@ -0,0 +1,291 @@
import cv2
import numpy as np
from PIL import Image
from transformers import SamImageProcessor, SamModel, SamProcessor, pipeline
try:
import cv2
import supervision as sv
except ImportError:
print("Please install supervision and cv")
from enum import Enum
class FeatureType(Enum):
"""
An enumeration to represent the types of features for mask adjustment in image
segmentation.
"""
ISLAND = "ISLAND"
HOLE = "HOLE"
@classmethod
def list(cls):
return list(map(lambda c: c.value, cls))
def compute_mask_iou_vectorized(masks: np.ndarray) -> np.ndarray:
"""
Vectorized computation of the Intersection over Union (IoU) for all pairs of masks.
Parameters:
masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the
number of masks, `H` is the height, and `W` is the width.
Returns:
np.ndarray: A 2D numpy array of shape `(N, N)` where each element `[i, j]` is
the IoU between masks `i` and `j`.
Raises:
ValueError: If any of the masks is found to be empty.
"""
if np.any(masks.sum(axis=(1, 2)) == 0):
raise ValueError(
"One or more masks are empty. Please filter out empty masks before"
" using `compute_iou_vectorized` function."
)
masks_bool = masks.astype(bool)
masks_flat = masks_bool.reshape(masks.shape[0], -1)
intersection = np.logical_and(masks_flat[:, None], masks_flat[None, :]).sum(
axis=2
)
union = np.logical_or(masks_flat[:, None], masks_flat[None, :]).sum(axis=2)
iou_matrix = intersection / union
return iou_matrix
def mask_non_max_suppression(
masks: np.ndarray, iou_threshold: float = 0.6
) -> np.ndarray:
"""
Performs Non-Max Suppression on a set of masks by prioritizing larger masks and
removing smaller masks that overlap significantly.
When the IoU between two masks exceeds the specified threshold, the smaller mask
(in terms of area) is discarded. This process is repeated for each pair of masks,
effectively filtering out masks that are significantly overlapped by larger ones.
Parameters:
masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the
number of masks, `H` is the height, and `W` is the width.
iou_threshold (float): The IoU threshold for determining significant overlap.
Returns:
np.ndarray: A 3D numpy array of filtered masks.
"""
num_masks = masks.shape[0]
areas = masks.sum(axis=(1, 2))
sorted_idx = np.argsort(-areas)
keep_mask = np.ones(num_masks, dtype=bool)
iou_matrix = compute_mask_iou_vectorized(masks)
for i in range(num_masks):
if not keep_mask[sorted_idx[i]]:
continue
overlapping_masks = iou_matrix[sorted_idx[i]] > iou_threshold
overlapping_masks[sorted_idx[i]] = False
keep_mask[sorted_idx] = np.logical_and(
keep_mask[sorted_idx], ~overlapping_masks
)
return masks[keep_mask]
def filter_masks_by_relative_area(
masks: np.ndarray, minimum_area: float = 0.01, maximum_area: float = 1.0
) -> np.ndarray:
"""
Filters masks based on their relative area within the total area of each mask.
Parameters:
masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the
number of masks, `H` is the height, and `W` is the width.
minimum_area (float): The minimum relative area threshold. Must be between `0`
and `1`.
maximum_area (float): The maximum relative area threshold. Must be between `0`
and `1`.
Returns:
np.ndarray: A 3D numpy array containing masks that fall within the specified
relative area range.
Raises:
ValueError: If `minimum_area` or `maximum_area` are outside the `0` to `1`
range, or if `minimum_area` is greater than `maximum_area`.
"""
if not (isinstance(masks, np.ndarray) and masks.ndim == 3):
raise ValueError("Input must be a 3D numpy array.")
if not (0 <= minimum_area <= 1) or not (0 <= maximum_area <= 1):
raise ValueError(
"`minimum_area` and `maximum_area` must be between 0 and 1."
)
if minimum_area > maximum_area:
raise ValueError(
"`minimum_area` must be less than or equal to `maximum_area`."
)
total_area = masks.shape[1] * masks.shape[2]
relative_areas = masks.sum(axis=(1, 2)) / total_area
return masks[
(relative_areas >= minimum_area) & (relative_areas <= maximum_area)
]
def adjust_mask_features_by_relative_area(
mask: np.ndarray,
area_threshold: float,
feature_type: FeatureType = FeatureType.ISLAND,
) -> np.ndarray:
"""
Adjusts a mask by removing small islands or filling small holes based on a relative
area threshold.
!!! warning
Running this function on a mask with small islands may result in empty masks.
Parameters:
mask (np.ndarray): A 2D numpy array with shape `(H, W)`, where `H` is the
height, and `W` is the width.
area_threshold (float): Threshold for relative area to remove or fill features.
feature_type (FeatureType): Type of feature to adjust (`ISLAND` for removing
islands, `HOLE` for filling holes).
Returns:
np.ndarray: A 2D numpy array containing mask.
"""
height, width = mask.shape
total_area = width * height
mask = np.uint8(mask * 255)
operation = (
cv2.RETR_EXTERNAL
if feature_type == FeatureType.ISLAND
else cv2.RETR_CCOMP
)
contours, _ = cv2.findContours(mask, operation, cv2.CHAIN_APPROX_SIMPLE)
for contour in contours:
area = cv2.contourArea(contour)
relative_area = area / total_area
if relative_area < area_threshold:
cv2.drawContours(
image=mask,
contours=[contour],
contourIdx=-1,
color=(0 if feature_type == FeatureType.ISLAND else 255),
thickness=-1,
)
return np.where(mask > 0, 1, 0).astype(bool)
def masks_to_marks(masks: np.ndarray) -> sv.Detections:
"""
Converts a set of masks to a marks (sv.Detections) object.
Parameters:
masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the
number of masks, `H` is the height, and `W` is the width.
Returns:
sv.Detections: An object containing the masks and their bounding box
coordinates.
"""
return sv.Detections(mask=masks, xyxy=sv.mask_to_xyxy(masks=masks))
def refine_marks(
marks: sv.Detections,
maximum_hole_area: float = 0.01,
maximum_island_area: float = 0.01,
minimum_mask_area: float = 0.02,
maximum_mask_area: float = 1.0,
) -> sv.Detections:
"""
Refines a set of masks by removing small islands and holes, and filtering by mask
area.
Parameters:
marks (sv.Detections): An object containing the masks and their bounding box
coordinates.
maximum_hole_area (float): The maximum relative area of holes to be filled in
each mask.
maximum_island_area (float): The maximum relative area of islands to be removed
from each mask.
minimum_mask_area (float): The minimum relative area for a mask to be retained.
maximum_mask_area (float): The maximum relative area for a mask to be retained.
Returns:
sv.Detections: An object containing the masks and their bounding box
coordinates.
"""
result_masks = []
for mask in marks.mask:
mask = adjust_mask_features_by_relative_area(
mask=mask,
area_threshold=maximum_island_area,
feature_type=FeatureType.ISLAND,
)
mask = adjust_mask_features_by_relative_area(
mask=mask,
area_threshold=maximum_hole_area,
feature_type=FeatureType.HOLE,
)
if np.any(mask):
result_masks.append(mask)
result_masks = np.array(result_masks)
result_masks = filter_masks_by_relative_area(
masks=result_masks,
minimum_area=minimum_mask_area,
maximum_area=maximum_mask_area,
)
return sv.Detections(
mask=result_masks, xyxy=sv.mask_to_xyxy(masks=result_masks)
)
class SegmentAnythingMarkGenerator:
"""
A class for performing image segmentation using a specified model.
Parameters:
device (str): The device to run the model on (e.g., 'cpu', 'cuda').
model_name (str): The name of the model to be loaded. Defaults to
'facebook/sam-vit-huge'.
"""
def __init__(
self, device: str = "cpu", model_name: str = "facebook/sam-vit-huge"
):
self.model = SamModel.from_pretrained(model_name).to(device)
self.processor = SamProcessor.from_pretrained(model_name)
self.image_processor = SamImageProcessor.from_pretrained(model_name)
self.pipeline = pipeline(
task="mask-generation",
model=self.model,
image_processor=self.image_processor,
device=device,
)
def run(self, image: np.ndarray) -> sv.Detections:
"""
Generate image segmentation marks.
Parameters:
image (np.ndarray): The image to be marked in BGR format.
Returns:
sv.Detections: An object containing the segmentation masks and their
corresponding bounding box coordinates.
"""
image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
outputs = self.pipeline(image, points_per_batch=64)
masks = np.array(outputs["masks"])
return masks_to_marks(masks=masks)

@ -0,0 +1,13 @@
# System prompt
FLOW_SYSTEM_PROMPT = """
You are an autonomous agent granted autonomy in a autonomous loop structure.
Your role is to engage in multi-step conversations with your self or the user,
generate long-form content like blogs, screenplays, or SOPs,
and accomplish tasks bestowed by the user.
You can have internal dialogues with yourself or can interact with the user
to aid in these complex tasks. Your responses should be coherent, contextually relevant, and tailored to the task at hand.
"""

@ -0,0 +1,61 @@
# Prompts
DYNAMIC_STOP_PROMPT = """
Now, when you 99% sure you have completed the task, you may follow the instructions below to escape the autonomous loop.
When you have finished the task from the Human, output a special token: <DONE>
This will enable you to leave the autonomous loop.
"""
# Make it able to handle multi input tools
DYNAMICAL_TOOL_USAGE = """
You have access to the following tools:
Output a JSON object with the following structure to use the tools
commands: {
"tools": {
tool1: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool2: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool3: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
}
}
-------------TOOLS---------------------------
{tools}
"""
SCENARIOS = """
commands: {
"tools": {
tool1: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool2: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool3: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
}
}
"""

@ -17,80 +17,12 @@ from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
) )
from swarms.utils.pdf_to_text import pdf_to_text from swarms.utils.pdf_to_text import pdf_to_text
# System prompt from swarms.prompts.tools import (
FLOW_SYSTEM_PROMPT = f""" DYNAMIC_STOP_PROMPT,
You are an autonomous agent granted autonomy in a autonomous loop structure. DYNAMICAL_TOOL_USAGE,
Your role is to engage in multi-step conversations with your self or the user, SCENARIOS
generate long-form content like blogs, screenplays, or SOPs, )
and accomplish tasks bestowed by the user. from swarms.prompts.agent_system_prompts import FLOW_SYSTEM_PROMPT
You can have internal dialogues with yourself or can interact with the user
to aid in these complex tasks. Your responses should be coherent, contextually relevant, and tailored to the task at hand.
"""
# Prompts
DYNAMIC_STOP_PROMPT = """
Now, when you 99% sure you have completed the task, you may follow the instructions below to escape the autonomous loop.
When you have finished the task from the Human, output a special token: <DONE>
This will enable you to leave the autonomous loop.
"""
# Make it able to handle multi input tools
DYNAMICAL_TOOL_USAGE = """
You have access to the following tools:
Output a JSON object with the following structure to use the tools
commands: {
"tools": {
tool1: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool2: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool3: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
}
}
-------------TOOLS---------------------------
{tools}
"""
SCENARIOS = """
commands: {
"tools": {
tool1: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool2: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool3: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
}
}
"""
def autonomous_agent_prompt( def autonomous_agent_prompt(
tools_prompt: str = DYNAMICAL_TOOL_USAGE, tools_prompt: str = DYNAMICAL_TOOL_USAGE,

@ -1,97 +0,0 @@
from swarms.models import OpenAIChat
from swarms.structs.agent import Agent
import concurrent.futures
from typing import Callable, List, Dict, Any, Sequence
class Task:
def __init__(
self,
id: str,
task: str,
flows: Sequence[Agent],
dependencies: List[str] = [],
):
self.id = id
self.task = task
self.flows = flows
self.dependencies = dependencies
self.results = []
def execute(self, parent_results: Dict[str, Any]):
args = [parent_results[dep] for dep in self.dependencies]
for agent in self.flows:
result = agent.run(self.task, *args)
self.results.append(result)
args = [
result
] # The output of one agent becomes the input to the next
class Workflow:
def __init__(self):
self.tasks: Dict[str, Task] = {}
self.executor = concurrent.futures.ThreadPoolExecutor()
def add_task(self, task: Task):
self.tasks[task.id] = task
def run(self):
completed_tasks = set()
while len(completed_tasks) < len(self.tasks):
futures = []
for task in self.tasks.values():
if task.id not in completed_tasks and all(
dep in completed_tasks for dep in task.dependencies
):
future = self.executor.submit(
task.execute,
{
dep: self.tasks[dep].results
for dep in task.dependencies
},
)
futures.append((future, task.id))
for future, task_id in futures:
future.result() # Wait for task completion
completed_tasks.add(task_id)
def get_results(self):
return {task_id: task.results for task_id, task in self.tasks.items()}
# create flows
llm = OpenAIChat(openai_api_key="sk-")
flow1 = Agent(llm, max_loops=1)
flow2 = Agent(llm, max_loops=1)
flow3 = Agent(llm, max_loops=1)
flow4 = Agent(llm, max_loops=1)
# Create tasks with their respective Agents and task strings
task1 = Task("task1", "Generate a summary on Quantum field theory", [flow1])
task2 = Task(
"task2",
"Elaborate on the summary of topic X",
[flow2, flow3],
dependencies=["task1"],
)
task3 = Task(
"task3", "Generate conclusions for topic X", [flow4], dependencies=["task1"]
)
# Create a workflow and add tasks
workflow = Workflow()
workflow.add_task(task1)
workflow.add_task(task2)
workflow.add_task(task3)
# Run the workflow
workflow.run()
# Get results
results = workflow.get_results()
print(results)

@ -0,0 +1,49 @@
from swarms.structs.agent import Agent
from typing import List, Dict, Any, Sequence
class Task:
"""
Task is a unit of work that can be executed by a set of agents.
A task is defined by a task name and a set of agents that can execute the task.
The task can also have a set of dependencies, which are the names of other tasks
that must be executed before this task can be executed.
Args:
id (str): A unique identifier for the task
task (str): The name of the task
agents (Sequence[Agent]): A list of agents that can execute the task
dependencies (List[str], optional): A list of task names that must be executed before this task can be executed. Defaults to [].
Methods:
execute(parent_results: Dict[str, Any]): Executes the task by passing the results of the parent tasks to the agents.
"""
def __init__(
self,
id: str,
task: str,
agents: Sequence[Agent],
dependencies: List[str] = [],
):
self.id = id
self.task = task
self.agents = agents
self.dependencies = dependencies
self.results = []
def execute(self, parent_results: Dict[str, Any]):
"""Executes the task by passing the results of the parent tasks to the agents.
Args:
parent_results (Dict[str, Any]): _description_
"""
args = [parent_results[dep] for dep in self.dependencies]
for agent in self.agents:
result = agent.run(self.task, *args)
self.results.append(result)
args = [
result
] # The output of one agent becomes the input to the next

@ -0,0 +1,106 @@
import os
from unittest.mock import Mock
import pytest
from dotenv import load_dotenv
from swarms.models.gpt4_vision_api import GPT4VisionAPI
from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
)
from swarms.structs.agent import Agent
from swarms.structs.task import Task
load_dotenv()
@pytest.fixture
def llm():
return GPT4VisionAPI()
def test_agent_run_task(llm):
task = (
"Analyze this image of an assembly line and identify any issues such as"
" misaligned parts, defects, or deviations from the standard assembly"
" process. IF there is anything unsafe in the image, explain why it is"
" unsafe and how it could be improved."
)
img = "assembly_line.jpg"
agent = Agent(
llm=llm,
max_loops="auto",
sop=MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
dashboard=True,
)
result = agent.run(task=task, img=img)
# Add assertions here to verify the expected behavior of the agent's run method
assert isinstance(result, dict)
assert "response" in result
assert "dashboard_data" in result
# Add more assertions as needed
@pytest.fixture
def task():
agents = [Agent(llm=llm, id=f"Agent_{i}") for i in range(5)]
return Task(id="Task_1", task="Task_Name", agents=agents, dependencies=[])
# Basic tests
def test_task_init(task):
assert task.id == "Task_1"
assert task.task == "Task_Name"
assert isinstance(task.agents, list)
assert len(task.agents) == 5
assert isinstance(task.dependencies, list)
def test_task_execute(task, mocker):
mocker.patch.object(Agent, "run", side_effect=[1, 2, 3, 4, 5])
parent_results = {}
task.execute(parent_results)
assert isinstance(task.results, list)
assert len(task.results) == 5
for result in task.results:
assert isinstance(result, int)
# Parameterized tests
@pytest.mark.parametrize("num_agents", [1, 3, 5, 10])
def test_task_num_agents(task, num_agents, mocker):
task.agents = [Agent(id=f"Agent_{i}") for i in range(num_agents)]
mocker.patch.object(Agent, "run", return_value=1)
parent_results = {}
task.execute(parent_results)
assert len(task.results) == num_agents
# Exception testing
def test_task_execute_with_dependency_error(task, mocker):
task.dependencies = ["NonExistentTask"]
mocker.patch.object(Agent, "run", return_value=1)
parent_results = {}
with pytest.raises(KeyError):
task.execute(parent_results)
# Mocking and monkeypatching tests
def test_task_execute_with_mocked_agents(task, mocker):
mock_agents = [Mock(spec=Agent) for _ in range(5)]
mocker.patch.object(task, "agents", mock_agents)
for mock_agent in mock_agents:
mock_agent.run.return_value = 1
parent_results = {}
task.execute(parent_results)
assert len(task.results) == 5
Loading…
Cancel
Save