Merge pull request #4 from kyegomez/master

catch up 20231128 1801
pull/207/head
evelynmitchell 1 year ago committed by GitHub
commit fb0cfaefb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -24,3 +24,4 @@ agent = Agent(llm=llm, max_loops=1, dashboard=True)
# Run the workflow on a task
out = agent.run("Generate a 10,000 word blog on health and wellness.")
print(out)

@ -104,9 +104,10 @@ nav:
- Guides:
- Overview: "examples/index.md"
- Agents:
- Flow: "examples/flow.md"
- SequentialWorkflow: "examples/reliable_autonomous_agents.md"
- Agent: "examples/flow.md"
- OmniAgent: "examples/omni_agent.md"
- Swarms:
- SequentialWorkflow: "examples/reliable_autonomous_agents.md"
- 2O+ Autonomous Agent Blogs: "examples/ideas.md"
- Applications:
- CustomerSupport:

@ -1,11 +1,20 @@
from swarms.structs import Agent
import os
from dotenv import load_dotenv
from swarms.models.gpt4_vision_api import GPT4VisionAPI
from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
)
from swarms.structs import Agent
load_dotenv()
api_key = os.environ.get("OPENAI_API_KEY")
llm = GPT4VisionAPI()
llm = GPT4VisionAPI(
openai_api_key=api_key,
)
task = "What is the color of the object?"
img = "images/swarms.jpeg"
@ -15,6 +24,9 @@ agent = Agent(
llm=llm,
max_loops="auto",
sop=MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
autosave=True,
dashboard=True,
)
agent.run(task=task, img=img)
out = agent.run(task=task, img=img)
print(out)

@ -27,8 +27,7 @@ api_key = os.getenv("OPENAI_API_KEY")
TASK = """
code
CODE
"""
@ -38,12 +37,12 @@ llm = OpenAIChat(openai_api_key=api_key, max_tokens=5000)
# Documentation agent
documentation_agent = Agent(
llm=llm, sop=DOCUMENTATION_SOP, max_loops=1, multi_modal=True
llm=llm, sop=DOCUMENTATION_SOP, max_loops=1,
)
# Tests agent
tests_agent = Agent(llm=llm, sop=TEST_SOP, max_loops=2, multi_modal=True)
tests_agent = Agent(llm=llm, sop=TEST_SOP, max_loops=2,)
# Run the documentation agent

@ -1,11 +1,3 @@
from swarms.memory.pinecone import PineconeVector
from swarms.memory.base import BaseVectorStore
from swarms.memory.pg import PgVectorVectorStore
from swarms.memory.ocean import OceanDB
__all__ = [
"BaseVectorStore",
"PineconeVector",
"PgVectorVectorStore",
"OceanDB",
]
# from swarms.memory.pinecone import PineconeVector
# from swarms.memory.base import BaseVectorStore
# from swarms.memory.pg import PgVectorVectorStore

@ -9,8 +9,6 @@ from griptape.artifacts import TextArtifact
@define
class BaseVectorStore(ABC):
""" """
DEFAULT_QUERY_COUNT = 5
@dataclass

@ -1,177 +0,0 @@
import uuid
from abc import ABC
from typing import Any, Dict, List, Optional
from swarms.memory.schemas import Artifact, Status
from swarms.memory.schemas import Step as APIStep
from swarms.memory.schemas import Task as APITask
class Step(APIStep):
additional_properties: Optional[Dict[str, str]] = None
class Task(APITask):
steps: List[Step] = []
class NotFoundException(Exception):
"""
Exception raised when a resource is not found.
"""
def __init__(self, item_name: str, item_id: str):
self.item_name = item_name
self.item_id = item_id
super().__init__(f"{item_name} with {item_id} not found.")
class TaskDB(ABC):
async def create_task(
self,
input: Optional[str],
additional_input: Any = None,
artifacts: Optional[List[Artifact]] = None,
steps: Optional[List[Step]] = None,
) -> Task:
raise NotImplementedError
async def create_step(
self,
task_id: str,
name: Optional[str] = None,
input: Optional[str] = None,
is_last: bool = False,
additional_properties: Optional[Dict[str, str]] = None,
) -> Step:
raise NotImplementedError
async def create_artifact(
self,
task_id: str,
file_name: str,
relative_path: Optional[str] = None,
step_id: Optional[str] = None,
) -> Artifact:
raise NotImplementedError
async def get_task(self, task_id: str) -> Task:
raise NotImplementedError
async def get_step(self, task_id: str, step_id: str) -> Step:
raise NotImplementedError
async def get_artifact(self, task_id: str, artifact_id: str) -> Artifact:
raise NotImplementedError
async def list_tasks(self) -> List[Task]:
raise NotImplementedError
async def list_steps(
self, task_id: str, status: Optional[Status] = None
) -> List[Step]:
raise NotImplementedError
class InMemoryTaskDB(TaskDB):
_tasks: Dict[str, Task] = {}
async def create_task(
self,
input: Optional[str],
additional_input: Any = None,
artifacts: Optional[List[Artifact]] = None,
steps: Optional[List[Step]] = None,
) -> Task:
if not steps:
steps = []
if not artifacts:
artifacts = []
task_id = str(uuid.uuid4())
task = Task(
task_id=task_id,
input=input,
steps=steps,
artifacts=artifacts,
additional_input=additional_input,
)
self._tasks[task_id] = task
return task
async def create_step(
self,
task_id: str,
name: Optional[str] = None,
input: Optional[str] = None,
is_last=False,
additional_properties: Optional[Dict[str, Any]] = None,
) -> Step:
step_id = str(uuid.uuid4())
step = Step(
task_id=task_id,
step_id=step_id,
name=name,
input=input,
status=Status.created,
is_last=is_last,
additional_properties=additional_properties,
)
task = await self.get_task(task_id)
task.steps.append(step)
return step
async def get_task(self, task_id: str) -> Task:
task = self._tasks.get(task_id, None)
if not task:
raise NotFoundException("Task", task_id)
return task
async def get_step(self, task_id: str, step_id: str) -> Step:
task = await self.get_task(task_id)
step = next(filter(lambda s: s.task_id == task_id, task.steps), None)
if not step:
raise NotFoundException("Step", step_id)
return step
async def get_artifact(self, task_id: str, artifact_id: str) -> Artifact:
task = await self.get_task(task_id)
artifact = next(
filter(lambda a: a.artifact_id == artifact_id, task.artifacts), None
)
if not artifact:
raise NotFoundException("Artifact", artifact_id)
return artifact
async def create_artifact(
self,
task_id: str,
file_name: str,
relative_path: Optional[str] = None,
step_id: Optional[str] = None,
) -> Artifact:
artifact_id = str(uuid.uuid4())
artifact = Artifact(
artifact_id=artifact_id,
file_name=file_name,
relative_path=relative_path,
)
task = await self.get_task(task_id)
task.artifacts.append(artifact)
if step_id:
step = await self.get_step(task_id, step_id)
step.artifacts.append(artifact)
return artifact
async def list_tasks(self) -> List[Task]:
return [task for task in self._tasks.values()]
async def list_steps(
self, task_id: str, status: Optional[Status] = None
) -> List[Step]:
task = await self.get_task(task_id)
steps = task.steps
if status:
steps = list(filter(lambda s: s.status == status, steps))
return steps

@ -1,157 +0,0 @@
import logging
from typing import List
import oceandb
from oceandb.utils.embedding_function import MultiModalEmbeddingFunction
class OceanDB:
"""
A class to interact with OceanDB.
...
Attributes
----------
client : oceandb.Client
a client to interact with OceanDB
Methods
-------
create_collection(collection_name: str, modality: str):
Creates a new collection in OceanDB.
append_document(collection, document: str, id: str):
Appends a document to a collection in OceanDB.
add_documents(collection, documents: List[str], ids: List[str]):
Adds multiple documents to a collection in OceanDB.
query(collection, query_texts: list[str], n_results: int):
Queries a collection in OceanDB.
"""
def __init__(self, client: oceandb.Client = None):
"""
Constructs all the necessary attributes for the OceanDB object.
Parameters
----------
client : oceandb.Client, optional
a client to interact with OceanDB (default is None, which creates a new client)
"""
try:
self.client = client if client else oceandb.Client()
print(self.client.heartbeat())
except Exception as e:
logging.error(f"Failed to initialize OceanDB client. Error: {e}")
raise
def create_collection(self, collection_name: str, modality: str):
"""
Creates a new collection in OceanDB.
Parameters
----------
collection_name : str
the name of the new collection
modality : str
the modality of the new collection
Returns
-------
collection
the created collection
"""
try:
embedding_function = MultiModalEmbeddingFunction(modality=modality)
collection = self.client.create_collection(
collection_name, embedding_function=embedding_function
)
return collection
except Exception as e:
logging.error(f"Failed to create collection. Error {e}")
raise
def append_document(self, collection, document: str, id: str):
"""
Appends a document to a collection in OceanDB.
Parameters
----------
collection
the collection to append the document to
document : str
the document to append
id : str
the id of the document
Returns
-------
result
the result of the append operation
"""
try:
return collection.add(documents=[document], ids=[id])
except Exception as e:
logging.error(
f"Failed to append document to the collection. Error {e}"
)
raise
def add_documents(self, collection, documents: List[str], ids: List[str]):
"""
Adds multiple documents to a collection in OceanDB.
Parameters
----------
collection
the collection to add the documents to
documents : List[str]
the documents to add
ids : List[str]
the ids of the documents
Returns
-------
result
the result of the add operation
"""
try:
return collection.add(documents=documents, ids=ids)
except Exception as e:
logging.error(f"Failed to add documents to collection. Error: {e}")
raise
def query(self, collection, query_texts: list[str], n_results: int):
"""
Queries a collection in OceanDB.
Parameters
----------
collection
the collection to query
query_texts : list[str]
the texts to query
n_results : int
the number of results to return
Returns
-------
results
the results of the query
"""
try:
results = collection.query(
query_texts=query_texts, n_results=n_results
)
return results
except Exception as e:
logging.error(f"Failed to query the collection. Error {e}")
raise
# Example
# ocean = OceanDB()
# collection = ocean.create_collection("test", "text")
# ocean.append_document(collection, "hello world", "1")
# ocean.add_documents(collection, ["hello world", "hello world"], ["2", "3"])
# results = ocean.query(collection, ["hello world"], 3)
# print(results)

@ -1,12 +1,12 @@
from typing import Optional
from swarms.memory.vector_stores.base import BaseVector
from swarms.memory.base import BaseVectorStore
import pinecone
from attr import define, field
from swarms.utils.hash import str_to_hash
@define
class PineconeVectorStoreStore(BaseVector):
class PineconeVectorStoreStore(BaseVectorStore):
"""
PineconeVectorStore is a vector storage driver that uses Pinecone as the underlying storage engine.
@ -24,11 +24,11 @@ class PineconeVectorStoreStore(BaseVector):
Methods:
upsert_vector(vector: list[float], vector_id: Optional[str] = None, namespace: Optional[str] = None, meta: Optional[dict] = None, **kwargs) -> str:
Upserts a vector into the index.
load_entry(vector_id: str, namespace: Optional[str] = None) -> Optional[BaseVector.Entry]:
load_entry(vector_id: str, namespace: Optional[str] = None) -> Optional[BaseVectorStore.Entry]:
Loads a single vector from the index.
load_entries(namespace: Optional[str] = None) -> list[BaseVector.Entry]:
load_entries(namespace: Optional[str] = None) -> list[BaseVectorStore.Entry]:
Loads all vectors from the index.
query(query: str, count: Optional[int] = None, namespace: Optional[str] = None, include_vectors: bool = False, include_metadata=True, **kwargs) -> list[BaseVector.QueryResult]:
query(query: str, count: Optional[int] = None, namespace: Optional[str] = None, include_vectors: bool = False, include_metadata=True, **kwargs) -> list[BaseVectorStore.QueryResult]:
Queries the index for vectors similar to the given query string.
create_index(name: str, **kwargs) -> None:
Creates a new index.
@ -121,7 +121,7 @@ class PineconeVectorStoreStore(BaseVector):
def load_entry(
self, vector_id: str, namespace: Optional[str] = None
) -> Optional[BaseVector.Entry]:
) -> Optional[BaseVectorStore.Entry]:
"""Load entry"""
result = self.index.fetch(
ids=[vector_id], namespace=namespace
@ -131,7 +131,7 @@ class PineconeVectorStoreStore(BaseVector):
if len(vectors) > 0:
vector = vectors[0]
return BaseVector.Entry(
return BaseVectorStore.Entry(
id=vector["id"],
meta=vector["metadata"],
vector=vector["values"],
@ -142,7 +142,7 @@ class PineconeVectorStoreStore(BaseVector):
def load_entries(
self, namespace: Optional[str] = None
) -> list[BaseVector.Entry]:
) -> list[BaseVectorStore.Entry]:
"""Load entries"""
# This is a hacky way to query up to 10,000 values from Pinecone. Waiting on an official API for fetching
# all values from a namespace:
@ -156,7 +156,7 @@ class PineconeVectorStoreStore(BaseVector):
)
return [
BaseVector.Entry(
BaseVectorStore.Entry(
id=r["id"],
vector=r["values"],
meta=r["metadata"],
@ -174,12 +174,12 @@ class PineconeVectorStoreStore(BaseVector):
# PineconeVectorStoreStorageDriver-specific params:
include_metadata=True,
**kwargs,
) -> list[BaseVector.QueryResult]:
) -> list[BaseVectorStore.QueryResult]:
"""Query vectors"""
vector = self.embedding_driver.embed_string(query)
params = {
"top_k": count if count else BaseVector.DEFAULT_QUERY_COUNT,
"top_k": count if count else BaseVectorStore.DEFAULT_QUERY_COUNT,
"namespace": namespace,
"include_values": include_vectors,
"include_metadata": include_metadata,
@ -188,7 +188,7 @@ class PineconeVectorStoreStore(BaseVector):
results = self.index.query(vector, **params)
return [
BaseVector.QueryResult(
BaseVectorStore.QueryResult(
id=r["id"],
vector=r["values"],
score=r["score"],

@ -1,8 +1,3 @@
import sys
log_file = open("errors.txt", "w")
sys.stderr = log_file
# LLMs
from swarms.models.anthropic import Anthropic # noqa: E402
from swarms.models.petals import Petals # noqa: E402

@ -69,6 +69,10 @@ class BaseMultiModalModel:
device: Optional[str] = "cuda",
max_new_tokens: Optional[int] = 500,
retries: Optional[int] = 3,
system_prompt: Optional[str] = None,
meta_prompt: Optional[str] = None,
*args,
**kwargs,
):
self.model_name = model_name
self.temperature = temperature
@ -265,3 +269,17 @@ class BaseMultiModalModel:
"""
for chunk in content:
print(chunk)
def meta_prompt(self):
"""Meta Prompt
Returns:
_type_: _description_
"""
META_PROMPT = """
For any labels or markings on an image that you reference in your response, please
enclose them in square brackets ([]) and list them explicitly. Do not use ranges; for
example, instead of '1 - 4', list as '[1], [2], [3], [4]'. These labels could be
numbers or letters and typically correspond to specific segments or parts of the image.
"""
return META_PROMPT

@ -23,6 +23,12 @@ load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
gpt4_vision_system_prompt = """
You are an multi-modal autonomous agent. You are given a task and an image. You must generate a response to the task and image.
"""
class GPT4VisionAPI:
"""
GPT-4 Vision API
@ -67,6 +73,10 @@ class GPT4VisionAPI:
openai_proxy: str = "https://api.openai.com/v1/chat/completions",
beautify: bool = False,
streaming_enabled: Optional[bool] = False,
meta_prompt: Optional[bool] = False,
system_prompt: Optional[str] = gpt4_vision_system_prompt,
*args,
**kwargs,
):
super().__init__()
self.openai_api_key = openai_api_key
@ -77,6 +87,8 @@ class GPT4VisionAPI:
self.openai_proxy = openai_proxy
self.beautify = beautify
self.streaming_enabled = streaming_enabled
self.meta_prompt = meta_prompt
self.system_prompt = system_prompt
if self.logging_enabled:
logging.basicConfig(level=logging.DEBUG)
@ -85,6 +97,9 @@ class GPT4VisionAPI:
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
if self.meta_prompt:
self.system_prompt = self.meta_prompt_init()
def encode_image(self, img: str):
"""Encode image to base64."""
with open(img, "rb") as image_file:
@ -110,8 +125,9 @@ class GPT4VisionAPI:
"Authorization": f"Bearer {openai_api_key}",
}
payload = {
"model": "gpt-4-vision-preview",
"model": self.model_name,
"messages": [
{"role": "system", "content": [self.system_prompt]},
{
"role": "user",
"content": [
@ -125,7 +141,7 @@ class GPT4VisionAPI:
},
},
],
}
},
],
"max_tokens": self.max_tokens,
}
@ -233,7 +249,13 @@ class GPT4VisionAPI:
for img in base64_frames:
base64.b64decode(img.encode("utf-8"))
def __call__(self, task: str, img: str):
def __call__(
self,
task: Optional[str] = None,
img: Optional[str] = None,
*args,
**kwargs,
):
"""Run the model."""
try:
base64_image = self.encode_image(img)
@ -242,8 +264,9 @@ class GPT4VisionAPI:
"Authorization": f"Bearer {openai_api_key}",
}
payload = {
"model": "gpt-4-vision-preview",
"model": self.model_name,
"messages": [
{"role": "system", "content": [self.system_prompt]},
{
"role": "user",
"content": [
@ -257,7 +280,7 @@ class GPT4VisionAPI:
},
},
],
}
},
],
"max_tokens": self.max_tokens,
}
@ -425,3 +448,17 @@ class GPT4VisionAPI:
)
)
return dashboard
# def meta_prompt_init(self):
# """Meta Prompt
# Returns:
# _type_: _description_
# """
# META_PROMPT = """
# For any labels or markings on an image that you reference in your response, please
# enclose them in square brackets ([]) and list them explicitly. Do not use ranges; for
# example, instead of '1 - 4', list as '[1], [2], [3], [4]'. These labels could be
# numbers or letters and typically correspond to specific segments or parts of the image.
# """
# return META_PROMPT

@ -752,6 +752,21 @@ class OpenAIChat(BaseLLM):
Any parameters that are valid to be passed to the openai.create call can be passed
in, even if not explicitly saved on this class.
Args:
model_name: The model name to use.
model_kwargs: Any additional kwargs to pass to the model.
openai_api_key: The OpenAI API key to use.
openai_api_base: The OpenAI API base to use.
openai_proxy: The OpenAI proxy to use.
max_retries: The maximum number of retries to make when generating.
prefix_messages: The prefix messages to use.
streaming: Whether to stream the results or not.
allowed_special: Set of special tokens that are allowed
disallowed_special: Set of special tokens that are not allowed
Example:
.. code-block:: python
@ -760,13 +775,10 @@ class OpenAIChat(BaseLLM):
"""
client: Any #: :meta private:
model_name: str = "gpt-3.5-turbo"
"""Model name to use."""
model_name: str = "gpt-3.5-turbo-1106"
model_kwargs: Dict[str, Any] = Field(default_factory=dict)
"""Holds any model parameters valid for `create` call not explicitly specified."""
openai_api_key: Optional[str] = None
openai_api_base: Optional[str] = None
# to support explicit proxy for OpenAI
openai_proxy: Optional[str] = None
max_retries: int = 6
"""Maximum number of retries to make when generating."""

@ -0,0 +1,291 @@
import cv2
import numpy as np
from PIL import Image
from transformers import SamImageProcessor, SamModel, SamProcessor, pipeline
try:
import cv2
import supervision as sv
except ImportError:
print("Please install supervision and cv")
from enum import Enum
class FeatureType(Enum):
"""
An enumeration to represent the types of features for mask adjustment in image
segmentation.
"""
ISLAND = "ISLAND"
HOLE = "HOLE"
@classmethod
def list(cls):
return list(map(lambda c: c.value, cls))
def compute_mask_iou_vectorized(masks: np.ndarray) -> np.ndarray:
"""
Vectorized computation of the Intersection over Union (IoU) for all pairs of masks.
Parameters:
masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the
number of masks, `H` is the height, and `W` is the width.
Returns:
np.ndarray: A 2D numpy array of shape `(N, N)` where each element `[i, j]` is
the IoU between masks `i` and `j`.
Raises:
ValueError: If any of the masks is found to be empty.
"""
if np.any(masks.sum(axis=(1, 2)) == 0):
raise ValueError(
"One or more masks are empty. Please filter out empty masks before"
" using `compute_iou_vectorized` function."
)
masks_bool = masks.astype(bool)
masks_flat = masks_bool.reshape(masks.shape[0], -1)
intersection = np.logical_and(masks_flat[:, None], masks_flat[None, :]).sum(
axis=2
)
union = np.logical_or(masks_flat[:, None], masks_flat[None, :]).sum(axis=2)
iou_matrix = intersection / union
return iou_matrix
def mask_non_max_suppression(
masks: np.ndarray, iou_threshold: float = 0.6
) -> np.ndarray:
"""
Performs Non-Max Suppression on a set of masks by prioritizing larger masks and
removing smaller masks that overlap significantly.
When the IoU between two masks exceeds the specified threshold, the smaller mask
(in terms of area) is discarded. This process is repeated for each pair of masks,
effectively filtering out masks that are significantly overlapped by larger ones.
Parameters:
masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the
number of masks, `H` is the height, and `W` is the width.
iou_threshold (float): The IoU threshold for determining significant overlap.
Returns:
np.ndarray: A 3D numpy array of filtered masks.
"""
num_masks = masks.shape[0]
areas = masks.sum(axis=(1, 2))
sorted_idx = np.argsort(-areas)
keep_mask = np.ones(num_masks, dtype=bool)
iou_matrix = compute_mask_iou_vectorized(masks)
for i in range(num_masks):
if not keep_mask[sorted_idx[i]]:
continue
overlapping_masks = iou_matrix[sorted_idx[i]] > iou_threshold
overlapping_masks[sorted_idx[i]] = False
keep_mask[sorted_idx] = np.logical_and(
keep_mask[sorted_idx], ~overlapping_masks
)
return masks[keep_mask]
def filter_masks_by_relative_area(
masks: np.ndarray, minimum_area: float = 0.01, maximum_area: float = 1.0
) -> np.ndarray:
"""
Filters masks based on their relative area within the total area of each mask.
Parameters:
masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the
number of masks, `H` is the height, and `W` is the width.
minimum_area (float): The minimum relative area threshold. Must be between `0`
and `1`.
maximum_area (float): The maximum relative area threshold. Must be between `0`
and `1`.
Returns:
np.ndarray: A 3D numpy array containing masks that fall within the specified
relative area range.
Raises:
ValueError: If `minimum_area` or `maximum_area` are outside the `0` to `1`
range, or if `minimum_area` is greater than `maximum_area`.
"""
if not (isinstance(masks, np.ndarray) and masks.ndim == 3):
raise ValueError("Input must be a 3D numpy array.")
if not (0 <= minimum_area <= 1) or not (0 <= maximum_area <= 1):
raise ValueError(
"`minimum_area` and `maximum_area` must be between 0 and 1."
)
if minimum_area > maximum_area:
raise ValueError(
"`minimum_area` must be less than or equal to `maximum_area`."
)
total_area = masks.shape[1] * masks.shape[2]
relative_areas = masks.sum(axis=(1, 2)) / total_area
return masks[
(relative_areas >= minimum_area) & (relative_areas <= maximum_area)
]
def adjust_mask_features_by_relative_area(
mask: np.ndarray,
area_threshold: float,
feature_type: FeatureType = FeatureType.ISLAND,
) -> np.ndarray:
"""
Adjusts a mask by removing small islands or filling small holes based on a relative
area threshold.
!!! warning
Running this function on a mask with small islands may result in empty masks.
Parameters:
mask (np.ndarray): A 2D numpy array with shape `(H, W)`, where `H` is the
height, and `W` is the width.
area_threshold (float): Threshold for relative area to remove or fill features.
feature_type (FeatureType): Type of feature to adjust (`ISLAND` for removing
islands, `HOLE` for filling holes).
Returns:
np.ndarray: A 2D numpy array containing mask.
"""
height, width = mask.shape
total_area = width * height
mask = np.uint8(mask * 255)
operation = (
cv2.RETR_EXTERNAL
if feature_type == FeatureType.ISLAND
else cv2.RETR_CCOMP
)
contours, _ = cv2.findContours(mask, operation, cv2.CHAIN_APPROX_SIMPLE)
for contour in contours:
area = cv2.contourArea(contour)
relative_area = area / total_area
if relative_area < area_threshold:
cv2.drawContours(
image=mask,
contours=[contour],
contourIdx=-1,
color=(0 if feature_type == FeatureType.ISLAND else 255),
thickness=-1,
)
return np.where(mask > 0, 1, 0).astype(bool)
def masks_to_marks(masks: np.ndarray) -> sv.Detections:
"""
Converts a set of masks to a marks (sv.Detections) object.
Parameters:
masks (np.ndarray): A 3D numpy array with shape `(N, H, W)`, where `N` is the
number of masks, `H` is the height, and `W` is the width.
Returns:
sv.Detections: An object containing the masks and their bounding box
coordinates.
"""
return sv.Detections(mask=masks, xyxy=sv.mask_to_xyxy(masks=masks))
def refine_marks(
marks: sv.Detections,
maximum_hole_area: float = 0.01,
maximum_island_area: float = 0.01,
minimum_mask_area: float = 0.02,
maximum_mask_area: float = 1.0,
) -> sv.Detections:
"""
Refines a set of masks by removing small islands and holes, and filtering by mask
area.
Parameters:
marks (sv.Detections): An object containing the masks and their bounding box
coordinates.
maximum_hole_area (float): The maximum relative area of holes to be filled in
each mask.
maximum_island_area (float): The maximum relative area of islands to be removed
from each mask.
minimum_mask_area (float): The minimum relative area for a mask to be retained.
maximum_mask_area (float): The maximum relative area for a mask to be retained.
Returns:
sv.Detections: An object containing the masks and their bounding box
coordinates.
"""
result_masks = []
for mask in marks.mask:
mask = adjust_mask_features_by_relative_area(
mask=mask,
area_threshold=maximum_island_area,
feature_type=FeatureType.ISLAND,
)
mask = adjust_mask_features_by_relative_area(
mask=mask,
area_threshold=maximum_hole_area,
feature_type=FeatureType.HOLE,
)
if np.any(mask):
result_masks.append(mask)
result_masks = np.array(result_masks)
result_masks = filter_masks_by_relative_area(
masks=result_masks,
minimum_area=minimum_mask_area,
maximum_area=maximum_mask_area,
)
return sv.Detections(
mask=result_masks, xyxy=sv.mask_to_xyxy(masks=result_masks)
)
class SegmentAnythingMarkGenerator:
"""
A class for performing image segmentation using a specified model.
Parameters:
device (str): The device to run the model on (e.g., 'cpu', 'cuda').
model_name (str): The name of the model to be loaded. Defaults to
'facebook/sam-vit-huge'.
"""
def __init__(
self, device: str = "cpu", model_name: str = "facebook/sam-vit-huge"
):
self.model = SamModel.from_pretrained(model_name).to(device)
self.processor = SamProcessor.from_pretrained(model_name)
self.image_processor = SamImageProcessor.from_pretrained(model_name)
self.pipeline = pipeline(
task="mask-generation",
model=self.model,
image_processor=self.image_processor,
device=device,
)
def run(self, image: np.ndarray) -> sv.Detections:
"""
Generate image segmentation marks.
Parameters:
image (np.ndarray): The image to be marked in BGR format.
Returns:
sv.Detections: An object containing the segmentation masks and their
corresponding bounding box coordinates.
"""
image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
outputs = self.pipeline(image, points_per_batch=64)
masks = np.array(outputs["masks"])
return masks_to_marks(masks=masks)

@ -0,0 +1,11 @@
# System prompt
FLOW_SYSTEM_PROMPT = """
You are an autonomous agent granted autonomy in a autonomous loop structure.
Your role is to engage in multi-step conversations with your self or the user,
generate long-form content like blogs, screenplays, or SOPs,
and accomplish tasks bestowed by the user.
You can have internal dialogues with yourself or can interact with the user
to aid in these complex tasks. Your responses should be coherent, contextually relevant, and tailored to the task at hand.
"""

@ -0,0 +1,99 @@
VISION_PROMPT = """
You are a Self-Operating Computer. You use the same operating system as a human.
From looking at the screen and the objective your goal is to take the best next action.
To operate the computer you have the four options below.
1. CLICK - Move mouse and click
2. TYPE - Type on the keyboard
3. SEARCH - Search for a program on Mac and open it
4. DONE - When you completed the task respond with the exact following phrase content
Here are the response formats below.
1. CLICK
Response: CLICK {{ "x": "percent", "y": "percent", "description": "~description here~", "reason": "~reason here~" }}
2. TYPE
Response: TYPE "value you want to type"
2. SEARCH
Response: SEARCH "app you want to search for on Mac"
3. DONE
Response: DONE
Here are examples of how to respond.
__
Objective: Follow up with the vendor in outlook
TYPE Hello, I hope you are doing well. I wanted to follow up
__
Objective: Open Spotify and play the beatles
SEARCH Spotify
__
Objective: Find a image of a banana
CLICK {{ "x": "50%", "y": "60%", "description": "Click: Google Search field", "reason": "This will allow me to search for a banana" }}
__
Objective: Go buy a book about the history of the internet
TYPE https://www.amazon.com/
__
A few important notes:
- Default to opening Google Chrome with SEARCH to find things that are on the internet.
- Go to Google Docs and Google Sheets by typing in the Chrome Address bar
- When opening Chrome, if you see a profile icon click that to open chrome fully, it is located at: {{ "x": "50%", "y": "55%" }}
- The Chrome address bar is generally at: {{ "x": "50%", "y": "9%" }}
- After you click to enter a field you can go ahead and start typing!
{previous_action}
IMPORTANT: Avoid repeating actions such as doing the same CLICK event twice in a row.
Objective: {objective}
"""
USER_QUESTION = "Hello, I can help you with anything. What would you like done?"
SUMMARY_PROMPT = """
You are a Self-Operating Computer. You just completed a request from a user by operating the computer. Now you need to share the results.
You have three pieces of key context about the completed request.
1. The original objective
2. The steps you took to reach the objective that are available in the previous messages
3. The screenshot you are looking at.
Now you need to summarize what you did to reach the objective. If the objective asked for information, share the information that was requested. IMPORTANT: Don't forget to answer a user's question if they asked one.
Thing to note: The user can not respond to your summary. You are just sharing the results of your work.
The original objective was: {objective}
Now share the results!
"""
def format_summary_prompt(objective):
"""
Format the summary prompt
"""
prompt = SUMMARY_PROMPT.format(objective=objective)
return prompt
def format_vision_prompt(objective, previous_action):
"""
Format the vision prompt
"""
if previous_action:
previous_action = (
f"Here was the previous action you took: {previous_action}"
)
else:
previous_action = ""
prompt = VISION_PROMPT.format(
objective=objective, previous_action=previous_action
)
return prompt

@ -0,0 +1,60 @@
# Prompts
DYNAMIC_STOP_PROMPT = """
Now, when you 99% sure you have completed the task, you may follow the instructions below to escape the autonomous loop.
When you have finished the task from the Human, output a special token: <DONE>
This will enable you to leave the autonomous loop.
"""
# Make it able to handle multi input tools
DYNAMICAL_TOOL_USAGE = """
You have access to the following tools:
Output a JSON object with the following structure to use the tools
commands: {
"tools": {
tool1: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool2: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool3: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
}
}
-------------TOOLS---------------------------
{tools}
"""
SCENARIOS = """
commands: {
"tools": {
tool1: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool2: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool3: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
}
}
"""

@ -5,92 +5,25 @@ import logging
import random
import re
import time
import uuid
from typing import Any, Callable, Dict, List, Optional, Tuple
from termcolor import colored
from swarms.tools.tool import BaseTool
from swarms.utils.code_interpreter import SubprocessCodeInterpreter
from swarms.utils.parse_code import extract_code_in_backticks_in_string
from swarms.prompts.agent_system_prompts import FLOW_SYSTEM_PROMPT
from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
)
from swarms.prompts.tools import (
DYNAMIC_STOP_PROMPT,
DYNAMICAL_TOOL_USAGE,
SCENARIOS,
)
from swarms.tools.tool import BaseTool
from swarms.utils.code_interpreter import SubprocessCodeInterpreter
from swarms.utils.parse_code import extract_code_in_backticks_in_string
from swarms.utils.pdf_to_text import pdf_to_text
# System prompt
FLOW_SYSTEM_PROMPT = f"""
You are an autonomous agent granted autonomy in a autonomous loop structure.
Your role is to engage in multi-step conversations with your self or the user,
generate long-form content like blogs, screenplays, or SOPs,
and accomplish tasks bestowed by the user.
You can have internal dialogues with yourself or can interact with the user
to aid in these complex tasks. Your responses should be coherent, contextually relevant, and tailored to the task at hand.
"""
# Prompts
DYNAMIC_STOP_PROMPT = """
Now, when you 99% sure you have completed the task, you may follow the instructions below to escape the autonomous loop.
When you have finished the task from the Human, output a special token: <DONE>
This will enable you to leave the autonomous loop.
"""
# Make it able to handle multi input tools
DYNAMICAL_TOOL_USAGE = """
You have access to the following tools:
Output a JSON object with the following structure to use the tools
commands: {
"tools": {
tool1: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool2: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool3: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
}
}
-------------TOOLS---------------------------
{tools}
"""
SCENARIOS = """
commands: {
"tools": {
tool1: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool2: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
"tool3: "tool_name",
"params": {
"tool1": "inputs",
"tool1": "inputs"
}
}
}
"""
def autonomous_agent_prompt(
tools_prompt: str = DYNAMICAL_TOOL_USAGE,
@ -137,6 +70,11 @@ def parse_done_token(response: str) -> bool:
return "<DONE>" in response
def agent_id():
"""Generate an agent id"""
return str(uuid.uuid4())
class Agent:
"""
Agent is the structure that provides autonomy to any llm in a reliable and effective fashion.
@ -162,52 +100,47 @@ class Agent:
**kwargs (Any): Any additional keyword arguments
Methods:
run: Run the autonomous agent loop
run_concurrent: Run the autonomous agent loop concurrently
bulk_run: Run the autonomous agent loop in bulk
save: Save the agent history to a file
load: Load the agent history from a file
validate_response: Validate the response based on certain criteria
print_history_and_memory: Print the history and memory of the agent
step: Execute a single step in the agent interaction
graceful_shutdown: Gracefully shutdown the system saving the state
run_with_timeout: Run the loop but stop if it takes longer than the timeout
analyze_feedback: Analyze the feedback for issues
undo_last: Response the last response and return the previous state
add_response_filter: Add a response filter to filter out certain words from the response
apply_reponse_filters: Apply the response filters to the response
filtered_run: Filter the response
interactive_run: Interactive run mode
streamed_generation: Stream the generation of the response
get_llm_params: Extracts and returns the parameters of the llm object for serialization.
agent_history_prompt: Generate the agent history prompt
add_task_to_memory: Add the task to the memory
add_message_to_memory: Add the message to the memory
add_message_to_memory_and_truncate: Add the message to the memory and truncate
print_dashboard: Print dashboard
activate_autonomous_agent: Print the autonomous agent activation message
_check_stopping_condition: Check if the stopping condition is met
format_prompt: Format the prompt
get_llm_init_params: Get the llm init params
provide_feedback: Allow users to provide feedback on the responses
truncate_history: Take the history and truncate it to fit into the model context length
agent_history_prompt: Generate the agent history prompt
extract_tool_commands: Extract the tool commands from the text
parse_and_execute_tools: Parse and execute the tools
execute_tools: Execute the tool with the provided parameters
construct_dynamic_prompt: Construct the dynamic prompt
get_tool_description: Get the tool description
find_tool_by_name: Find a tool by name
parse_tool_command: Parse the text for tool usage
dynamic_temperature: Dynamically change the temperature
_run: Generate a result using the provided keyword args.
from_llm_and_template: Create AgentStream from LLM and a string template.
from_llm_and_template_file: Create AgentStream from LLM and a template file.
save_state: Save the state of the agent
load_state: Load the state of the agent
run_async: Run the agent asynchronously
arun: Run the agent asynchronously
run_code: Run the code in the response
run(task: str, **kwargs: Any): Run the agent on a task
run_concurrent(tasks: List[str], **kwargs: Any): Run the agent on a list of tasks concurrently
bulk_run(inputs: List[Dict[str, Any]]): Run the agent on a list of inputs
from_llm_and_template(llm: Any, template: str): Create AgentStream from LLM and a string template.
from_llm_and_template_file(llm: Any, template_file: str): Create AgentStream from LLM and a template file.
save(file_path): Save the agent history to a file
load(file_path): Load the agent history from a file
validate_response(response: str): Validate the response based on certain criteria
print_history_and_memory(): Print the entire history and memory of the agent
step(task: str, **kwargs): Executes a single step in the agent interaction, generating a response from the language model based on the given input text.
graceful_shutdown(): Gracefully shutdown the system saving the state
run_with_timeout(task: str, timeout: int): Run the loop but stop if it takes longer than the timeout
analyze_feedback(): Analyze the feedback for issues
undo_last(): Response the last response and return the previous state
add_response_filter(filter_word: str): Add a response filter to filter out certain words from the response
apply_reponse_filters(response: str): Apply the response filters to the response
filtered_run(task: str): Filtered run
interactive_run(max_loops: int): Interactive run mode
streamed_generation(prompt: str): Stream the generation of the response
get_llm_params(): Extracts and returns the parameters of the llm object for serialization.
save_state(file_path: str): Saves the current state of the agent to a JSON file, including the llm parameters.
load_state(file_path: str): Loads the state of the agent from a json file and restores the configuration and memory.
retry_on_failure(function, retries: int = 3, retry_delay: int = 1): Retry wrapper for LLM calls.
run_code(response: str): Run the code in the response
construct_dynamic_prompt(): Construct the dynamic prompt
extract_tool_commands(text: str): Extract the tool commands from the text
parse_and_execute_tools(response: str): Parse and execute the tools
execute_tools(tool_name, params): Execute the tool with the provided params
truncate_history(): Take the history and truncate it to fit into the model context length
add_task_to_memory(task: str): Add the task to the memory
add_message_to_memory(message: str): Add the message to the memory
add_message_to_memory_and_truncate(message: str): Add the message to the memory and truncate
print_dashboard(task: str): Print dashboard
activate_autonomous_agent(): Print the autonomous agent activation message
dynamic_temperature(): Dynamically change the temperature
_check_stopping_condition(response: str): Check if the stopping condition is met
format_prompt(template, **kwargs: Any): Format the template with the provided kwargs using f-string interpolation.
get_llm_init_params(): Get LLM init params
get_tool_description(): Get the tool description
find_tool_by_name(name: str): Find a tool by name
Example:
>>> from swarms.models import OpenAIChat
@ -227,7 +160,8 @@ class Agent:
def __init__(
self,
llm: Any,
id: str = agent_id,
llm: Any = None,
template: Optional[str] = None,
max_loops=5,
stopping_condition: Optional[Callable[[str], bool]] = None,
@ -259,6 +193,7 @@ class Agent:
*args,
**kwargs: Any,
):
self.id = id
self.llm = llm
self.template = template
self.max_loops = max_loops
@ -471,6 +406,7 @@ class Agent:
----------------------------------------
Agent Configuration:
Agent ID: {self.id}
Name: {self.agent_name}
Description: {self.agent_description}
Standard Operating Procedure: {self.sop}
@ -519,6 +455,45 @@ class Agent:
)
)
print(error)
def loop_count_print(self, loop_count, max_loops):
"""loop_count_print summary
Args:
loop_count (_type_): _description_
max_loops (_type_): _description_
"""
print(
colored(f"\nLoop {loop_count} of {max_loops}", "cyan")
)
print("\n")
def _history(self, user_name: str, task: str) -> str:
"""Generate the history for the history prompt
Args:
user_name (str): _description_
task (str): _description_
Returns:
str: _description_
"""
history = [f"{user_name}: {task}"]
return history
def _dynamic_prompt_setup(self, dynamic_prompt: str, task: str) -> str:
"""_dynamic_prompt_setup summary
Args:
dynamic_prompt (str): _description_
task (str): _description_
Returns:
str: _description_
"""
dynamic_prompt = dynamic_prompt or self.construct_dynamic_prompt()
combined_prompt = f"{dynamic_prompt}\n{task}"
return combined_prompt
def run(self, task: Optional[str], img: Optional[str] = None, **kwargs):
"""
@ -536,14 +511,11 @@ class Agent:
"""
try:
# dynamic_prompt = self.construct_dynamic_prompt()
# combined_prompt = f"{dynamic_prompt}\n{task}"
# Activate Autonomous agent message
self.activate_autonomous_agent()
response = task # or combined_prompt
history = [f"{self.user_name}: {task}"]
history = self._history(self.user_name, task)
# If dashboard = True then print the dashboard
if self.dashboard:
@ -555,9 +527,7 @@ class Agent:
while self.max_loops == "auto" or loop_count < self.max_loops:
# Loop count
loop_count += 1
print(
colored(f"\nLoop {loop_count} of {self.max_loops}", "blue")
)
self.loop_count_print(loop_count, self.max_loops)
print("\n")
# Check to see if stopping token is in the output to stop the loop
@ -1197,14 +1167,14 @@ class Agent:
"""
def self_healing(self, **kwargs):
"""
Self healing by debugging errors and refactoring its own code
# def self_healing(self, **kwargs):
# """
# Self healing by debugging errors and refactoring its own code
Args:
**kwargs (Any): Any additional keyword arguments
"""
pass
# Args:
# **kwargs (Any): Any additional keyword arguments
# """
# pass
# def refactor_code(
# self,
@ -1229,29 +1199,29 @@ class Agent:
# # Sort the changes in reverse line order
# # explanations.sort(key=lambda x: x["line", reverse=True])
# # def error_prompt_inject(
# # self,
# # file_path: str,
# # args: List,
# # error: str,
# # ):
# # with open(file_path, "r") as f:
# # file_lines = f.readlines()
# # file_with_lines = []
# # for i, line in enumerate(file_lines):
# # file_with_lines.append(str(i + 1) + "" + line)
# # file_with_lines = "".join(file_with_lines)
# # prompt = f"""
# # Here is the script that needs fixing:\n\n
# # {file_with_lines}\n\n
# # Here are the arguments it was provided:\n\n
# # {args}\n\n
# # Here is the error message:\n\n
# # {error}\n
# # "Please provide your suggested changes, and remember to stick to the "
# # exact format as described above.
# # """
# # # Print(prompt)
# def error_prompt_inject(
# self,
# file_path: str,
# args: List,
# error: str,
# ):
# with open(file_path, "r") as f:
# file_lines = f.readlines()
# file_with_lines = []
# for i, line in enumerate(file_lines):
# file_with_lines.append(str(i + 1) + "" + line)
# file_with_lines = "".join(file_with_lines)
# prompt = f"""
# Here is the script that needs fixing:\n\n
# {file_with_lines}\n\n
# Here are the arguments it was provided:\n\n
# {args}\n\n
# Here is the error message:\n\n
# {error}\n
# "Please provide your suggested changes, and remember to stick to the "
# exact format as described above.
# """
# print(prompt)

@ -1,97 +0,0 @@
from swarms.models import OpenAIChat
from swarms.structs.agent import Agent
import concurrent.futures
from typing import Callable, List, Dict, Any, Sequence
class Task:
def __init__(
self,
id: str,
task: str,
flows: Sequence[Agent],
dependencies: List[str] = [],
):
self.id = id
self.task = task
self.flows = flows
self.dependencies = dependencies
self.results = []
def execute(self, parent_results: Dict[str, Any]):
args = [parent_results[dep] for dep in self.dependencies]
for agent in self.flows:
result = agent.run(self.task, *args)
self.results.append(result)
args = [
result
] # The output of one agent becomes the input to the next
class Workflow:
def __init__(self):
self.tasks: Dict[str, Task] = {}
self.executor = concurrent.futures.ThreadPoolExecutor()
def add_task(self, task: Task):
self.tasks[task.id] = task
def run(self):
completed_tasks = set()
while len(completed_tasks) < len(self.tasks):
futures = []
for task in self.tasks.values():
if task.id not in completed_tasks and all(
dep in completed_tasks for dep in task.dependencies
):
future = self.executor.submit(
task.execute,
{
dep: self.tasks[dep].results
for dep in task.dependencies
},
)
futures.append((future, task.id))
for future, task_id in futures:
future.result() # Wait for task completion
completed_tasks.add(task_id)
def get_results(self):
return {task_id: task.results for task_id, task in self.tasks.items()}
# create flows
llm = OpenAIChat(openai_api_key="sk-")
flow1 = Agent(llm, max_loops=1)
flow2 = Agent(llm, max_loops=1)
flow3 = Agent(llm, max_loops=1)
flow4 = Agent(llm, max_loops=1)
# Create tasks with their respective Agents and task strings
task1 = Task("task1", "Generate a summary on Quantum field theory", [flow1])
task2 = Task(
"task2",
"Elaborate on the summary of topic X",
[flow2, flow3],
dependencies=["task1"],
)
task3 = Task(
"task3", "Generate conclusions for topic X", [flow4], dependencies=["task1"]
)
# Create a workflow and add tasks
workflow = Workflow()
workflow.add_task(task1)
workflow.add_task(task2)
workflow.add_task(task3)
# Run the workflow
workflow.run()
# Get results
results = workflow.get_results()
print(results)

@ -0,0 +1,49 @@
from swarms.structs.agent import Agent
from typing import List, Dict, Any, Sequence
class Task:
"""
Task is a unit of work that can be executed by a set of agents.
A task is defined by a task name and a set of agents that can execute the task.
The task can also have a set of dependencies, which are the names of other tasks
that must be executed before this task can be executed.
Args:
id (str): A unique identifier for the task
task (str): The name of the task
agents (Sequence[Agent]): A list of agents that can execute the task
dependencies (List[str], optional): A list of task names that must be executed before this task can be executed. Defaults to [].
Methods:
execute(parent_results: Dict[str, Any]): Executes the task by passing the results of the parent tasks to the agents.
"""
def __init__(
self,
id: str,
task: str,
agents: Sequence[Agent],
dependencies: List[str] = [],
):
self.id = id
self.task = task
self.agents = agents
self.dependencies = dependencies
self.results = []
def execute(self, parent_results: Dict[str, Any]):
"""Executes the task by passing the results of the parent tasks to the agents.
Args:
parent_results (Dict[str, Any]): _description_
"""
args = [parent_results[dep] for dep in self.dependencies]
for agent in self.agents:
result = agent.run(self.task, *args)
self.results.append(result)
args = [
result
] # The output of one agent becomes the input to the next

@ -1,9 +1,12 @@
import logging
import os
import warnings
import sys
def disable_logging():
log_file = open("errors.txt", "w")
sys.stderr = log_file
warnings.filterwarnings("ignore", category=UserWarning)
# disable tensorflow warnings

@ -0,0 +1,107 @@
import os
from unittest.mock import Mock
import pytest
from dotenv import load_dotenv
from swarms.models.gpt4_vision_api import GPT4VisionAPI
from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
)
from swarms.structs.agent import Agent
from swarms.structs.task import Task
load_dotenv()
@pytest.fixture
def llm():
return GPT4VisionAPI()
def test_agent_run_task(llm):
task = (
"Analyze this image of an assembly line and identify any issues such as"
" misaligned parts, defects, or deviations from the standard assembly"
" process. IF there is anything unsafe in the image, explain why it is"
" unsafe and how it could be improved."
)
img = "assembly_line.jpg"
agent = Agent(
llm=llm,
max_loops="auto",
sop=MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
dashboard=True,
)
result = agent.run(task=task, img=img)
# Add assertions here to verify the expected behavior of the agent's run method
assert isinstance(result, dict)
assert "response" in result
assert "dashboard_data" in result
# Add more assertions as needed
@pytest.fixture
def task():
agents = [Agent(llm=llm, id=f"Agent_{i}") for i in range(5)]
return Task(id="Task_1", task="Task_Name", agents=agents, dependencies=[])
# Basic tests
def test_task_init(task):
assert task.id == "Task_1"
assert task.task == "Task_Name"
assert isinstance(task.agents, list)
assert len(task.agents) == 5
assert isinstance(task.dependencies, list)
def test_task_execute(task, mocker):
mocker.patch.object(Agent, "run", side_effect=[1, 2, 3, 4, 5])
parent_results = {}
task.execute(parent_results)
assert isinstance(task.results, list)
assert len(task.results) == 5
for result in task.results:
assert isinstance(result, int)
# Parameterized tests
@pytest.mark.parametrize("num_agents", [1, 3, 5, 10])
def test_task_num_agents(task, num_agents, mocker):
task.agents = [Agent(id=f"Agent_{i}") for i in range(num_agents)]
mocker.patch.object(Agent, "run", return_value=1)
parent_results = {}
task.execute(parent_results)
assert len(task.results) == num_agents
# Exception testing
def test_task_execute_with_dependency_error(task, mocker):
task.dependencies = ["NonExistentTask"]
mocker.patch.object(Agent, "run", return_value=1)
parent_results = {}
with pytest.raises(KeyError):
task.execute(parent_results)
# Mocking and monkeypatching tests
def test_task_execute_with_mocked_agents(task, mocker):
mock_agents = [Mock(spec=Agent) for _ in range(5)]
mocker.patch.object(task, "agents", mock_agents)
for mock_agent in mock_agents:
mock_agent.run.return_value = 1
parent_results = {}
task.execute(parent_results)
assert len(task.results) == 5
Loading…
Cancel
Save