Merge pull request #2 from kyegomez/master

Catching up 20240121 1236
pull/362/head
evelynmitchell 12 months ago committed by GitHub
commit 317b274ed6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -28,9 +28,15 @@ Run example in Collab: <a target="_blank" href="https://colab.research.google.co
</a>
### `Agent`
- Reliable Structure that provides LLMS autonomy
- Extremely Customizeable with stopping conditions, interactivity, dynamical temperature, loop intervals, and so much more
- Enterprise Grade + Production Grade: `Agent` is designed and optimized for automating real-world tasks at scale!
A fully plug in and play Autonomous agent powered by an LLM extended by a long term memory database, and equipped with function calling for tool usage! By passing in an LLM you can create a fully autonomous agent with extreme customization and reliability ready for real-world task automation!
Features:
✅ Any LLM / Any framework
✅ Extremely customize-able with max loops, autosaving, import docs (PDFS, TXT, CSVs, etc), tool usage, etc etc
✅ Long term memory database with RAG (ChromaDB, Pinecone, Qdrant)
```python
import os
@ -38,8 +44,7 @@ import os
from dotenv import load_dotenv
# Import the OpenAIChat model and the Agent struct
from swarms.models import OpenAIChat
from swarms.structs import Agent
from swarms import OpenAIChat, Agent
# Load the environment variables
load_dotenv()
@ -68,12 +73,13 @@ agent.run("Generate a 10,000 word blog on health and wellness.")
### `ToolAgent`
ToolAgent is an agent that outputs JSON using any model from huggingface. It takes in an example schema with fields and then you provide it with a simple task and it'll output json! Perfect for function calling, parallel, and multi-step tool usage!
- Versatility: The ToolAgent class is designed to be flexible and adaptable. It can be used with any model and tokenizer, making it suitable for a wide range of tasks. This versatility means that you can use ToolAgent as a foundation for any tool that requires language model processing.
Versatility: The ToolAgent class is designed to be flexible and adaptable. It can be used with any model and tokenizer, making it suitable for a wide range of tasks. This versatility means that you can use ToolAgent as a foundation for any tool that requires language model processing.
- Ease of Use: With its simple and intuitive interface, ToolAgent makes it easy to perform complex tasks. Just initialize it with your model, tokenizer, and JSON schema, and then call the run method with your task. This ease of use allows you to focus on your task, not on setting up your tools.
Ease of Use: With its simple and intuitive interface, ToolAgent makes it easy to perform complex tasks. Just initialize it with your model, tokenizer, and JSON schema, and then call the run method with your task. This ease of use allows you to focus on your task, not on setting up your tools.
- Customizability: ToolAgent accepts variable length arguments and keyword arguments, allowing you to customize its behavior to suit your needs. Whether you need to adjust the temperature of the model's output, limit the number of tokens, or tweak any other parameter, ToolAgent has you covered. This customizability ensures that ToolAgent can adapt to your specific requirements.
Customizability: ToolAgent accepts variable length arguments and keyword arguments, allowing you to customize its behavior to suit your needs. Whether you need to adjust the temperature of the model's output, limit the number of tokens, or tweak any other parameter, ToolAgent has you covered. This customizability ensures that ToolAgent can adapt to your specific requirements.
```python
@ -108,15 +114,18 @@ print(generated_data)
------
### `SequentialWorkflow`
- A Sequential swarm of autonomous agents where each agent's outputs are fed into the next agent
- Save and Restore Workflow states!
- Integrate Agent's with various LLMs and Multi-Modality Models
Sequential Workflow enables you to sequentially execute tasks with `Agent` and then pass the output into the next agent and onwards until you have specified your max loops. `SequentialWorkflow` is wonderful for real-world business tasks like sending emails, summarizing documents, and analyzing data.
✅ Save and Restore Workflow states!
✅ Multi-Modal Support for Visual Chaining
✅ Utilizes Agent class
```python
import os
from swarms.models import OpenAIChat
from swarms.structs import Agent
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms import OpenAIChat, Agent, SequentialWorkflow
from dotenv import load_dotenv
load_dotenv()
@ -167,7 +176,9 @@ for task in workflow.tasks:
### `ConcurrentWorkflow`
- Run all the tasks all at the same time
`ConcurrentWorkflow` runs all the tasks all at the same time with the inputs you give it!
```python
import os
from dotenv import load_dotenv
@ -199,7 +210,7 @@ workflow.run()
```
### `RecursiveWorkflow`
- Recursively iterate on a workflow until a specific token is detected.
`RecursiveWorkflow` will keep executing the tasks until a specific token like <DONE> is located inside the text!
```python
import os
@ -235,9 +246,9 @@ workflow.run()
### `ModelParallelizer`
- Concurrent Execution of Multiple Models: The ModelParallelizer allows you to run multiple models concurrently, comparing their outputs. This feature enables you to easily compare the performance and results of different models, helping you make informed decisions about which model to use for your specific task.
The ModelParallelizer allows you to run multiple models concurrently, comparing their outputs. This feature enables you to easily compare the performance and results of different models, helping you make informed decisions about which model to use for your specific task.
- Plug-and-Play Integration: The structure provides a seamless integration with various models, including OpenAIChat, Anthropic, Mixtral, and Gemini. You can easily plug in any of these models and start using them without the need for extensive modifications or setup.
Plug-and-Play Integration: The structure provides a seamless integration with various models, including OpenAIChat, Anthropic, Mixtral, and Gemini. You can easily plug in any of these models and start using them without the need for extensive modifications or setup.
```python
@ -245,8 +256,7 @@ import os
from dotenv import load_dotenv
from swarms.models import Anthropic, Gemini, Mixtral, OpenAIChat
from swarms.structs import ModelParallelizer
from swarms import Anthropic, Gemini, Mixtral, OpenAIChat, ModelParallelizer
load_dotenv()
@ -278,7 +288,8 @@ for i in range(len(out)):
### Simple Conversational Agent
- Plug in and play conversational agent with `GPT4`, `Mixytral`, or any of our models
A Plug in and play conversational agent with `GPT4`, `Mixytral`, or any of our models
- Reliable conversational structure to hold messages together with dynamic handling for long context conversations and interactions with auto chunking
- Reliable, this simple system will always provide responses you want.
@ -332,11 +343,13 @@ interactive_conversation(llm)
### `SwarmNetwork`
- Efficient Task Management: SwarmNetwork's intelligent agent pool and task queue management system ensures tasks are distributed evenly across agents. This leads to efficient use of resources and faster task completion.
`SwarmNetwork` provides the infrasturcture for building extremely dense and complex multi-agent applications that span across various types of agents.
- Scalability: SwarmNetwork can dynamically scale the number of agents based on the number of pending tasks. This means it can handle an increase in workload by adding more agents, and conserve resources when the workload is low by reducing the number of agents.
✅ Efficient Task Management: SwarmNetwork's intelligent agent pool and task queue management system ensures tasks are distributed evenly across agents. This leads to efficient use of resources and faster task completion.
- Versatile Deployment Options: With SwarmNetwork, each agent can be run on its own thread, process, container, machine, or even cluster. This provides a high degree of flexibility and allows for deployment that best suits the user's needs and infrastructure.
✅ Scalability: SwarmNetwork can dynamically scale the number of agents based on the number of pending tasks. This means it can handle an increase in workload by adding more agents, and conserve resources when the workload is low by reducing the number of agents.
✅ Versatile Deployment Options: With SwarmNetwork, each agent can be run on its own thread, process, container, machine, or even cluster. This provides a high degree of flexibility and allows for deployment that best suits the user's needs and infrastructure.
```python
import os
@ -390,21 +403,17 @@ print(out)
### `Task`
Task Execution: The Task structure allows for the execution of tasks by an assigned agent. The run method is used to execute the task. It's like a Zapier for LLMs
`Task` is a simple structure for task execution with the `Agent`. Imagine zapier for LLM-based workflow automation
✅ Task is a structure for task execution with the Agent.
- Task Description: Each Task can have a description, providing a human-readable explanation of what the task is intended to do.
- Task Scheduling: Tasks can be scheduled for execution at a specific time using the schedule_time attribute.
- Task Triggers: The set_trigger method allows for the setting of a trigger function that is executed before the task.
- Task Actions: The set_action method allows for the setting of an action function that is executed after the task.
- Task Conditions: The set_condition method allows for the setting of a condition function. The task will only be executed if this function returns True.
- Task Dependencies: The add_dependency method allows for the addition of dependencies to the task. The task will only be executed if all its dependencies have been completed.
- Task Priority: The set_priority method allows for the setting of the task's priority. Tasks with higher priority will be executed before tasks with lower priority.
- Task History: The history attribute is a list that keeps track of all the results of the task execution. This can be useful for debugging and for tasks that need to be executed multiple times.
✅ Tasks can have descriptions, scheduling, triggers, actions, conditions, dependencies, priority, and a history.
✅ The Task structure allows for efficient workflow automation with LLM-based agents.
```python
import os
from swarms.structs import Task, Agent
from swarms.models import OpenAIChat
from swarms import Task, Agent, OpenAIChat
from dotenv import load_dotenv
@ -568,9 +577,8 @@ print(inference)
## Real-World Deployment
### Multi-Agent Swarm for Logistics
- Swarms is a framework designed for real-world deployment here is a demo presenting a fully ready to use Swarm for a vast array of logistics tasks.
- Swarms is designed to be modular and reliable for real-world deployments.
- Swarms is the first framework that unleases multi-modal autonomous agents in the real world.
Here's a production grade swarm ready for real-world deployment in a factory and logistics settings like warehouses. This swarm can automate 3 costly and inefficient workflows, safety checks, productivity checks, and warehouse security.
```python
from swarms.structs import Agent
@ -680,7 +688,7 @@ efficiency_analysis = efficiency_agent.run(
## `Multi Modal Autonomous Agents`
- Run the agent with multiple modalities useful for various real-world tasks in manufacturing, logistics, and health.
Run the agent with multiple modalities useful for various real-world tasks in manufacturing, logistics, and health.
```python
# Description: This is an example of how to use the Agent class to run a multi-modal workflow
@ -736,7 +744,7 @@ import os
from dotenv import load_dotenv
from swarms.models import Gemini
from swarms import Gemini
from swarms.prompts.visual_cot import VISUAL_CHAIN_OF_THOUGHT
# Load the environment variables
@ -840,7 +848,7 @@ print(image_url)
### `GPT4Vision`
```python
from swarms.models import GPT4VisionAPI
from swarms import GPT4VisionAPI
# Initialize with default API key and custom max_tokens
api = GPT4VisionAPI(max_tokens=1000)
@ -902,18 +910,18 @@ cog_agent.run("Describe this scene", "images/1.jpg")
----
## Supported Models :heavy_check_mark:
## Supported Models
Swarms supports various model providers like OpenAI, Huggingface, Anthropic, Google, Mistral and many more.
| Provider | Provided :heavy_check_mark: | Module Name |
| Provider | Provided | Module Name |
|----------|-----------------------------|-------------|
| OpenAI | :heavy_check_mark: | OpenAIChat, OpenAITTS, GPT4VisionAPI, Dalle3 |
| Anthropic | :heavy_check_mark: | Anthropic |
| Mistral | :heavy_check_mark: | Mistral, Mixtral |
| Gemini/Palm | :heavy_check_mark: | Gemini |
| Huggingface | :heavy_check_mark: | HuggingFaceLLM |
| Modelscope | :heavy_check_mark: | Modelscope |
| Vllm | :heavy_check_mark: | vLLM |
| OpenAI | | OpenAIChat, OpenAITTS, GPT4VisionAPI, Dalle3 |
| Anthropic | | Anthropic |
| Mistral | | Mistral, Mixtral |
| Gemini/Palm | | Gemini |
| Huggingface | | HuggingFaceLLM |
| Modelscope | | Modelscope |
| Vllm | | vLLM |
---
@ -1029,6 +1037,11 @@ Help us accelerate our backlog by supporting us financially! Note, we're an open
<a href="https://polar.sh/kyegomez"><img src="https://polar.sh/embed/fund-our-backlog.svg?org=kyegomez" /></a>
## Swarm Newsletter 🤖 🤖 🤖 📧
Sign up to the Swarm newsletter to receieve updates on the latest Autonomous agent research papers, step by step guides on creating multi-agent app, and much more Swarmie goodiness 😊
[CLICK HERE TO SIGNUP](https://docs.google.com/forms/d/e/1FAIpQLSfqxI2ktPR9jkcIwzvHL0VY6tEIuVPd-P2fOWKnd6skT9j1EQ/viewform?usp=sf_link)
# License
Apache License

@ -20,52 +20,4 @@ workflow.add(task)
workflow.run()
```
Returns: None
#### Source Code:
```python
class RecursiveWorkflow(BaseStructure):
def __init__(self, stop_token: str = "<DONE>"):
"""
Args:
stop_token (str, optional): The token that indicates when to stop the workflow. Default is "<DONE>".
The stop_token indicates the value at which the current workflow is finished.
"""
self.stop_token = stop_token
self.tasks = []
assert (
self.stop_token is not None
), "stop_token cannot be None"
def add(self, task: Task, tasks: List[Task] = None):
"""Adds a task to the workflow.
Args:
task (Task): The task to be added.
tasks (List[Task], optional): List of tasks to be executed.
"""
try:
if tasks:
for task in tasks:
self.tasks.append(task)
else:
self.tasks.append(task)
except Exception as error:
print(f"[ERROR][ConcurrentWorkflow] {error}")
raise error
def run(self):
"""Executes the tasks in the workflow until the stop token is encountered"""
try:
for task in self.tasks:
while True:
result = task.execute()
if self.stop_token in result:
break
except Exception as error:
print(f"[ERROR][RecursiveWorkflow] {error}")
raise error
```
In summary, the `RecursiveWorkflow` class is designed to automate tasks by adding and executing these tasks recursively until a stopping condition is reached. This can be achieved by utilizing the `add` and `run` methods provided. A general format for adding and utilizing the `RecursiveWorkflow` class has been provided under the "Examples" section. If you require any further information, view other sections, like Args and Source Code for specifics on using the class effectively.

@ -11,8 +11,8 @@
from swarms.structs import Task, Agent
from swarms.models import OpenAIChat
agent = Agent(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False)
task = Task(description="What's the weather in miami", agent=agent)
task.execute()
task = Task(agent=agent)
task.execute("What's the weather in miami")
print(task.result)
# Example 2: Adding a dependency and setting priority

@ -0,0 +1,22 @@
import os
from dotenv import load_dotenv
from swarms.agents.worker_agent import Worker
from swarms import OpenAIChat
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
worker = Worker(
name="My Worker",
role="Worker",
human_in_the_loop=False,
tools=[],
temperature=0.5,
llm=OpenAIChat(openai_api_key=api_key),
)
out = worker.run(
"Hello, how are you? Create an image of how your are doing!"
)
print(out)

@ -1,10 +1,14 @@
from swarms import OpenAITTS
import os
from dotenv import load_dotenv
load_dotenv()
tts = OpenAITTS(
model_name="tts-1-1106",
voice="onyx",
openai_api_key="YOUR_API_KEY",
openai_api_key=os.getenv("OPENAI_API_KEY"),
)
out = tts.run_and_save("pliny is a girl and a chicken")
out = tts.run_and_save("Dammmmmm those tacos were good")
print(out)

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "3.6.5"
version = "3.7.5"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]
@ -31,7 +31,7 @@ asyncio = "3.4.3"
einops = "0.7.0"
google-generativeai = "0.3.1"
langchain-experimental = "0.0.10"
playwright = "1.34.0"
tensorflow = "*"
weaviate-client = "3.25.3"
opencv-python-headless = "4.8.1.78"
faiss-cpu = "1.7.4"
@ -44,14 +44,12 @@ PyPDF2 = "3.0.1"
accelerate = "*"
sentencepiece = "0.1.98"
wget = "3.2"
tensorflow = "2.14.0"
httpx = "0.24.1"
tiktoken = "0.4.0"
safetensors = "0.3.3"
attrs = "22.2.0"
ggl = "1.1.0"
ratelimit = "2.2.1"
beautifulsoup4 = "4.11.2"
cohere = "4.24"
huggingface-hub = "*"
pydantic = "1.10.12"
@ -71,8 +69,11 @@ pgvector = "*"
qdrant-client = "*"
sentence-transformers = "*"
peft = "*"
modelscope = "1.10.0"
psutil = "*"
ultralytics = "*"
timm = "*"
supervision = "*"
[tool.poetry.group.lint.dependencies]

@ -22,7 +22,7 @@ requests_mock
PyPDF2==3.0.1
accelerate==0.22.0
chromadb==0.4.14
tensorflow==2.14.0
tensorflow
optimum
tiktoken==0.4.0
tabulate==0.9.0
@ -45,7 +45,7 @@ openai==0.28.0
opencv-python==4.7.0.72
prettytable==3.9.0
safetensors==0.3.3
timm==0.6.13
timm
torchmetrics
webdataset
marshmallow==3.19.0
@ -59,5 +59,6 @@ mkdocs-material
mkdocs-glightbox
pre-commit==3.2.2
peft
modelscope
psutil
psutil
ultralytics
supervision

@ -2,10 +2,12 @@ from swarms.agents.message import Message
from swarms.agents.base import AbstractAgent
from swarms.agents.tool_agent import ToolAgent
from swarms.agents.simple_agent import SimpleAgent
from swarms.agents.omni_modal_agent import OmniModalAgent
__all__ = [
"Message",
"AbstractAgent",
"ToolAgent",
"SimpleAgent",
"OmniModalAgent",
]

@ -1,7 +1,4 @@
from typing import Dict, List
from langchain.base_language import BaseLanguageModel
from langchain.tools.base import BaseTool
from langchain_experimental.autonomous_agents.hugginggpt.repsonse_generator import (
load_response_generator,
)
@ -16,33 +13,6 @@ from transformers import load_tool
from swarms.agents.message import Message
class Step:
def __init__(
self,
task: str,
id: int,
dep: List[int],
args: Dict[str, str],
tool: BaseTool,
):
self.task = task
self.id = id
self.dep = dep
self.args = args
self.tool = tool
class Plan:
def __init__(self, steps: List[Step]):
self.steps = steps
def __str__(self) -> str:
return str([str(step) for step in self.steps])
def __repr(self) -> str:
return str(self)
class OmniModalAgent:
"""
OmniModalAgent

@ -0,0 +1,199 @@
import os
from typing import Any, List
import faiss
from langchain.docstore import InMemoryDocstore
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain_experimental.autonomous_agents import AutoGPT
from swarms.utils.decorators import error_decorator, timing_decorator
class Worker:
"""
The Worker class represents an autonomous agent that can perform tassks through
function calls or by running a chat.
Args:
name (str, optional): Name of the agent. Defaults to "Autobot Swarm Worker".
role (str, optional): Role of the agent. Defaults to "Worker in a swarm".
external_tools (list, optional): List of external tools. Defaults to None.
human_in_the_loop (bool, optional): Whether to include human in the loop. Defaults to False.
temperature (float, optional): Temperature for the agent. Defaults to 0.5.
llm ([type], optional): Language model. Defaults to None.
openai_api_key (str, optional): OpenAI API key. Defaults to None.
Raises:
RuntimeError: If there is an error while setting up the agent.
Example:
>>> worker = Worker(
... name="My Worker",
... role="Worker",
... external_tools=[MyTool1(), MyTool2()],
... human_in_the_loop=False,
... temperature=0.5,
... )
>>> worker.run("What's the weather in Miami?")
"""
def __init__(
self,
name: str = "WorkerAgent",
role: str = "Worker in a swarm",
external_tools=None,
human_in_the_loop: bool = False,
temperature: float = 0.5,
llm=None,
openai_api_key: str = None,
tools: List[Any] = None,
embedding_size: int = 1536,
search_kwargs: dict = {"k": 8},
*args,
**kwargs,
):
self.name = name
self.role = role
self.external_tools = external_tools
self.human_in_the_loop = human_in_the_loop
self.temperature = temperature
self.llm = llm
self.openai_api_key = openai_api_key
self.tools = tools
self.embedding_size = embedding_size
self.search_kwargs = search_kwargs
self.setup_tools(external_tools)
self.setup_memory()
self.setup_agent()
def reset(self):
"""
Reset the message history.
"""
self.message_history = []
def receieve(self, name: str, message: str) -> None:
"""
Receive a message and update the message history.
Parameters:
- `name` (str): The name of the sender.
- `message` (str): The received message.
"""
self.message_history.append(f"{name}: {message}")
def send(self) -> str:
"""Send message history."""
self.agent.run(task=self.message_history)
def setup_tools(self, external_tools):
"""
Set up tools for the worker.
Parameters:
- `external_tools` (list): List of external tools (optional).
Example:
```
external_tools = [MyTool1(), MyTool2()]
worker = Worker(model_name="gpt-4",
openai_api_key="my_key",
name="My Worker",
role="Worker",
external_tools=external_tools,
human_in_the_loop=False,
temperature=0.5)
```
"""
if self.tools is None:
self.tools = []
if external_tools is not None:
self.tools.extend(external_tools)
def setup_memory(self):
"""
Set up memory for the worker.
"""
openai_api_key = (
os.getenv("OPENAI_API_KEY") or self.openai_api_key
)
try:
embeddings_model = OpenAIEmbeddings(
openai_api_key=openai_api_key
)
embedding_size = self.embedding_size
index = faiss.IndexFlatL2(embedding_size)
self.vectorstore = FAISS(
embeddings_model.embed_query,
index,
InMemoryDocstore({}),
{},
)
except Exception as error:
raise RuntimeError(
"Error setting up memory perhaps try try tuning the"
f" embedding size: {error}"
)
def setup_agent(self):
"""
Set up the autonomous agent.
"""
try:
self.agent = AutoGPT.from_llm_and_tools(
ai_name=self.name,
ai_role=self.role,
tools=self.tools,
llm=self.llm,
memory=self.vectorstore.as_retriever(
search_kwargs=self.search_kwargs
),
human_in_the_loop=self.human_in_the_loop,
)
except Exception as error:
raise RuntimeError(f"Error setting up agent: {error}")
# @log_decorator
@error_decorator
@timing_decorator
def run(self, task: str = None, *args, **kwargs):
"""
Run the autonomous agent on a given task.
Parameters:
- `task`: The task to be processed.
Returns:
- `result`: The result of the agent's processing.
"""
try:
result = self.agent.run([task], *args, **kwargs)
return result
except Exception as error:
raise RuntimeError(f"Error while running agent: {error}")
# @log_decorator
@error_decorator
@timing_decorator
def __call__(self, task: str = None, *args, **kwargs):
"""
Make the worker callable to run the agent on a given task.
Parameters:
- `task`: The task to be processed.
Returns:
- `results`: The results of the agent's processing.
"""
try:
results = self.run(task, *args, **kwargs)
return results
except Exception as error:
raise RuntimeError(f"Error while running agent: {error}")

@ -18,10 +18,11 @@ from swarms.models.wizard_storytelling import (
) # noqa: E402
from swarms.models.mpt import MPT7B # noqa: E402
from swarms.models.mixtral import Mixtral # noqa: E402
from swarms.models.modelscope_pipeline import ModelScopePipeline
from swarms.models.modelscope_llm import (
ModelScopeAutoModel,
) # noqa: E402
# from swarms.models.modelscope_pipeline import ModelScopePipeline
# from swarms.models.modelscope_llm import (
# ModelScopeAutoModel,
# ) # noqa: E402
from swarms.models.together import TogetherLLM # noqa: E402
################# MultiModal Models
@ -39,13 +40,20 @@ from swarms.models.openai_tts import OpenAITTS # noqa: E402
from swarms.models.gemini import Gemini # noqa: E402
from swarms.models.gigabind import Gigabind # noqa: E402
from swarms.models.zeroscope import ZeroscopeTTV # noqa: E402
from swarms.models.timm import TimmModel # noqa: E402
from swarms.models.ultralytics_model import (
UltralyticsModel,
) # noqa: E402
# from swarms.models.dalle3 import Dalle3
# from swarms.models.distilled_whisperx import DistilWhisperModel # noqa: E402
# from swarms.models.whisperx_model import WhisperX # noqa: E402
# from swarms.models.kosmos_two import Kosmos # noqa: E402
from swarms.models.cog_agent import CogAgent # noqa: E402
# from swarms.models.cog_agent import CogAgent # noqa: E402
################# Tokenizers
############## Types
@ -91,8 +99,10 @@ __all__ = [
"AudioModality",
"VideoModality",
"MultimodalData",
"CogAgent",
"ModelScopePipeline",
"ModelScopeAutoModel",
# "CogAgent",
# "ModelScopePipeline",
# "ModelScopeAutoModel",
"TogetherLLM",
"TimmModel",
"UltralyticsModel",
]

@ -0,0 +1,91 @@
import supervision as sv
from ultraanalytics import YOLO
from tqdm import tqdm
from swarms.models.base_llm import AbstractLLM
class Odin(AbstractLLM):
"""
Odin class represents an object detection and tracking model.
Args:
source_weights_path (str): Path to the weights file for the object detection model.
source_video_path (str): Path to the source video file.
target_video_path (str): Path to save the output video file.
confidence_threshold (float): Confidence threshold for object detection.
iou_threshold (float): Intersection over Union (IoU) threshold for object detection.
Attributes:
source_weights_path (str): Path to the weights file for the object detection model.
source_video_path (str): Path to the source video file.
target_video_path (str): Path to save the output video file.
confidence_threshold (float): Confidence threshold for object detection.
iou_threshold (float): Intersection over Union (IoU) threshold for object detection.
"""
def __init__(
self,
source_weights_path: str = None,
target_video_path: str = None,
confidence_threshold: float = 0.3,
iou_threshold: float = 0.7,
):
super(Odin, self).__init__()
self.source_weights_path = source_weights_path
self.target_video_path = target_video_path
self.confidence_threshold = confidence_threshold
self.iou_threshold = iou_threshold
def run(self, video_path: str, *args, **kwargs):
"""
Runs the object detection and tracking algorithm on the specified video.
Args:
video_path (str): The path to the input video file.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
bool: True if the video was processed successfully, False otherwise.
"""
model = YOLO(self.source_weights_path)
tracker = sv.ByteTrack()
box_annotator = sv.BoxAnnotator()
frame_generator = sv.get_video_frames_generator(
source_path=self.source_video_path
)
video_info = sv.VideoInfo.from_video_path(
video_path=video_path
)
with sv.VideoSink(
target_path=self.target_video_path, video_info=video_info
) as sink:
for frame in tqdm(
frame_generator, total=video_info.total_frames
):
results = model(
frame,
verbose=True,
conf=self.confidence_threshold,
iou=self.iou_threshold,
)[0]
detections = sv.Detections.from_ultranalytics(results)
detections = tracker.update_with_detections(
detections
)
labels = [
f"#{tracker_id} {model.model.names[class_id]}"
for _, _, _, class_id, tracker_id in detections
]
annotated_frame = box_annotator.annotate(
scene=frame.copy(),
detections=detections,
labels=labels,
)
result = sink.write_frame(frame=annotated_frame)
return result

@ -0,0 +1,422 @@
# Copyright (c) OpenMMLab. All rights reserved.
import json
import os
import os.path as osp
from collections import deque
from typing import List, Optional, Sequence, Union
import torch
from swarms.utils.get_logger import get_logger
class SentencePieceTokenizer:
"""Tokenizer of sentencepiece.
Args:
model_file (str): the path of the tokenizer model
"""
def __init__(self, model_file: str):
from sentencepiece import SentencePieceProcessor
self.model = SentencePieceProcessor(model_file=model_file)
self._prefix_space_tokens = None
# for stop words
self._maybe_decode_bytes: bool = None
# TODO maybe lack a constant.py
self._indexes_tokens_deque = deque(maxlen=10)
self.max_indexes_num = 5
self.logger = get_logger("lmdeploy")
@property
def vocab_size(self):
"""vocabulary size."""
return self.model.vocab_size()
@property
def bos_token_id(self):
"""begine of the sentence token id."""
return self.model.bos_id()
@property
def eos_token_id(self):
"""end of the sentence token id."""
return self.model.eos_id()
@property
def prefix_space_tokens(self):
"""tokens without prefix space."""
if self._prefix_space_tokens is None:
vocab = self.model.IdToPiece(list(range(self.vocab_size)))
self._prefix_space_tokens = {
i
for i, tok in enumerate(vocab)
if tok.startswith("")
}
return self._prefix_space_tokens
def _maybe_add_prefix_space(self, tokens, decoded):
"""maybe add prefix space for incremental decoding."""
if (
len(tokens)
and not decoded.startswith(" ")
and tokens[0] in self.prefix_space_tokens
):
return " " + decoded
else:
return decoded
def indexes_containing_token(self, token: str):
"""Return all the possible indexes, whose decoding output may contain
the input token."""
# traversing vocab is time consuming, can not be accelerated with
# multi threads (computation) or multi process (can't pickle tokenizer)
# so, we maintain latest 10 stop words and return directly if matched
for _token, _indexes in self._indexes_tokens_deque:
if token == _token:
return _indexes
if token == " ": # ' ' is special
token = ""
vocab = self.model.IdToPiece(list(range(self.vocab_size)))
indexes = [i for i, voc in enumerate(vocab) if token in voc]
if len(indexes) > self.max_indexes_num:
indexes = self.encode(token, add_bos=False)[-1:]
self.logger.warning(
f"There are too many(>{self.max_indexes_num})"
f" possible indexes may decoding {token}, we will use"
f" {indexes} only"
)
self._indexes_tokens_deque.append((token, indexes))
return indexes
def encode(self, s: str, add_bos: bool = True, **kwargs):
"""Tokenize a prompt.
Args:
s (str): a prompt
Returns:
list[int]: token ids
"""
return self.model.Encode(s, add_bos=add_bos, **kwargs)
def decode(self, t: Sequence[int], offset: Optional[int] = None):
"""De-tokenize.
Args:
t (List[int]): a list of token ids
offset (int): for incrementally decoding. Default to None, which
means not applied.
Returns:
str: text of decoding tokens
"""
if isinstance(t, torch.Tensor):
t = t.tolist()
t = t[offset:]
out_string = self.model.Decode(t)
if offset:
out_string = self._maybe_add_prefix_space(t, out_string)
return out_string
def __call__(self, s: Union[str, Sequence[str]]):
"""Tokenize prompts.
Args:
s (str): prompts
Returns:
list[int]: token ids
"""
import addict
add_bos = False
add_eos = False
input_ids = self.model.Encode(
s, add_bos=add_bos, add_eos=add_eos
)
return addict.Addict(input_ids=input_ids)
class HuggingFaceTokenizer:
"""Tokenizer of sentencepiece.
Args:
model_dir (str): the directory of the tokenizer model
"""
def __init__(self, model_dir: str):
from transformers import AutoTokenizer
model_file = osp.join(model_dir, "tokenizer.model")
backend_tokenizer_file = osp.join(model_dir, "tokenizer.json")
model_file_exists = osp.exists(model_file)
self.logger = get_logger("lmdeploy")
if (
not osp.exists(backend_tokenizer_file)
and model_file_exists
):
self.logger.warning(
"Can not find tokenizer.json. "
"It may take long time to initialize the tokenizer."
)
self.model = AutoTokenizer.from_pretrained(
model_dir, trust_remote_code=True
)
self._prefix_space_tokens = None
# save tokenizer.json to reuse
if (
not osp.exists(backend_tokenizer_file)
and model_file_exists
):
if hasattr(self.model, "backend_tokenizer"):
if os.access(model_dir, os.W_OK):
self.model.backend_tokenizer.save(
backend_tokenizer_file
)
if self.model.eos_token_id is None:
generation_config_file = osp.join(
model_dir, "generation_config.json"
)
if osp.exists(generation_config_file):
with open(generation_config_file, "r") as f:
cfg = json.load(f)
self.model.eos_token_id = cfg["eos_token_id"]
elif hasattr(self.model, "eod_id"): # Qwen remote
self.model.eos_token_id = self.model.eod_id
# for stop words
self._maybe_decode_bytes: bool = None
# TODO maybe lack a constant.py
self._indexes_tokens_deque = deque(maxlen=10)
self.max_indexes_num = 5
self.token2id = {}
@property
def vocab_size(self):
"""vocabulary size."""
return self.model.vocab_size
@property
def bos_token_id(self):
"""begine of the sentence token id."""
return self.model.bos_token_id
@property
def eos_token_id(self):
"""end of the sentence token id."""
return self.model.eos_token_id
@property
def prefix_space_tokens(self):
"""tokens without prefix space."""
if self._prefix_space_tokens is None:
vocab = self.model.convert_ids_to_tokens(
list(range(self.vocab_size))
)
self._prefix_space_tokens = {
i
for i, tok in enumerate(vocab)
if tok.startswith(
"" if isinstance(tok, str) else b" "
)
}
return self._prefix_space_tokens
def _maybe_add_prefix_space(
self, tokens: List[int], decoded: str
):
"""maybe add prefix space for incremental decoding."""
if (
len(tokens)
and not decoded.startswith(" ")
and tokens[0] in self.prefix_space_tokens
):
return " " + decoded
else:
return decoded
@property
def maybe_decode_bytes(self):
"""Check if self.model.convert_ids_to_tokens return not a str value."""
if self._maybe_decode_bytes is None:
self._maybe_decode_bytes = False
vocab = self.model.convert_ids_to_tokens(
list(range(self.vocab_size))
)
for tok in vocab:
if not isinstance(tok, str):
self._maybe_decode_bytes = True
break
return self._maybe_decode_bytes
def indexes_containing_token(self, token: str):
"""Return all the possible indexes, whose decoding output may contain
the input token."""
# traversing vocab is time consuming, can not be accelerated with
# multi threads (computation) or multi process (can't pickle tokenizer)
# so, we maintain latest 10 stop words and return directly if matched
for _token, _indexes in self._indexes_tokens_deque:
if token == _token:
return _indexes
if self.token2id == {}:
# decode is slower than convert_ids_to_tokens
if self.maybe_decode_bytes:
self.token2id = {
self.model.decode(i): i
for i in range(self.vocab_size)
}
else:
self.token2id = {
self.model.convert_ids_to_tokens(i): i
for i in range(self.vocab_size)
}
if token == " ": # ' ' is special
token = ""
indexes = [
i
for _token, i in self.token2id.items()
if token in _token
]
if len(indexes) > self.max_indexes_num:
indexes = self.encode(token, add_bos=False)[-1:]
self.logger.warning(
f"There are too many(>{self.max_indexes_num})"
f" possible indexes may decoding {token}, we will use"
f" {indexes} only"
)
self._indexes_tokens_deque.append((token, indexes))
return indexes
def encode(self, s: str, add_bos: bool = True, **kwargs):
"""Tokenize a prompt.
Args:
s (str): a prompt
Returns:
list[int]: token ids
"""
encoded = self.model.encode(s, **kwargs)
if not add_bos:
# in the middle of a session
if len(encoded) and encoded[0] == self.bos_token_id:
encoded = encoded[1:]
return encoded
def decode(self, t: Sequence[int], offset: Optional[int] = None):
"""De-tokenize.
Args:
t (List[int]): a list of token ids
offset (int): for incrementally decoding. Default to None, which
means not applied.
Returns:
str: text of decoding tokens
"""
skip_special_tokens = True
t = t[offset:]
out_string = self.model.decode(
t, skip_special_tokens=skip_special_tokens
)
if offset:
out_string = self._maybe_add_prefix_space(t, out_string)
return out_string
def __call__(self, s: Union[str, Sequence[str]]):
"""Tokenize prompts.
Args:
s (str): prompts
Returns:
list[int]: token ids
"""
add_special_tokens = False
return self.model(s, add_special_tokens=add_special_tokens)
class Tokenizer:
"""Tokenize prompts or de-tokenize tokens into texts.
Args:
model_file (str): the path of the tokenizer model
"""
def __init__(self, model_file: str):
if model_file.endswith(".model"):
model_folder = osp.split(model_file)[0]
else:
model_folder = model_file
model_file = osp.join(model_folder, "tokenizer.model")
tokenizer_config_file = osp.join(
model_folder, "tokenizer_config.json"
)
model_file_exists = osp.exists(model_file)
config_exists = osp.exists(tokenizer_config_file)
use_hf_model = config_exists or not model_file_exists
self.logger = get_logger("lmdeploy")
if not use_hf_model:
self.model = SentencePieceTokenizer(model_file)
else:
self.model = HuggingFaceTokenizer(model_folder)
@property
def vocab_size(self):
"""vocabulary size."""
return self.model.vocab_size
@property
def bos_token_id(self):
"""begine of the sentence token id."""
return self.model.bos_token_id
@property
def eos_token_id(self):
"""end of the sentence token id."""
return self.model.eos_token_id
def encode(self, s: str, add_bos: bool = True, **kwargs):
"""Tokenize a prompt.
Args:
s (str): a prompt
Returns:
list[int]: token ids
"""
return self.model.encode(s, add_bos, **kwargs)
def decode(self, t: Sequence[int], offset: Optional[int] = None):
"""De-tokenize.
Args:
t (List[int]): a list of token ids
offset (int): for incrementally decoding. Default to None, which
means not applied.
Returns:
str: text of decoding tokens
"""
return self.model.decode(t, offset)
def __call__(self, s: Union[str, Sequence[str]]):
"""Tokenize prompts.
Args:
s (str): prompts
Returns:
list[int]: token ids
"""
return self.model(s)
def indexes_containing_token(self, token):
"""Return all the possible indexes, whose decoding output may contain
the input token."""
encoded = self.encode(token, add_bos=False)
if len(encoded) > 1:
self.logger.warning(
f"The token {token}, its length of indexes"
f" {encoded} is over than 1. Currently, it can not be"
" used as stop words"
)
return []
return self.model.indexes_containing_token(token)

@ -2,59 +2,48 @@ from typing import List
import timm
import torch
from pydantic import BaseModel
from torch import Tensor
from swarms.models.base_multimodal_model import BaseMultiModalModel
class TimmModelInfo(BaseModel):
model_name: str
pretrained: bool
in_chans: int
class Config:
# Use strict typing for all fields
strict = True
class TimmModel(BaseMultiModalModel):
"""
TimmModel is a class that wraps the timm library to provide a consistent
interface for creating and running models.
Args:
model_name: A string representing the name of the model to be created.
pretrained: A boolean indicating whether to use a pretrained model.
in_chans: An integer representing the number of input channels.
class TimmModel:
"""
Returns:
A TimmModel instance.
# Usage
model_handler = TimmModelHandler()
model_info = TimmModelInfo(model_name='resnet34', pretrained=True, in_chans=1)
input_tensor = torch.randn(1, 1, 224, 224)
output_shape = model_handler(model_info=model_info, input_tensor=input_tensor)
print(output_shape)
Example:
model = TimmModel('resnet18', pretrained=True, in_chans=3)
output_shape = model(input_tensor)
"""
def __init__(self):
def __init__(
self,
model_name: str,
pretrained: bool,
in_chans: int,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.model_name = model_name
self.pretrained = pretrained
self.in_chans = in_chans
self.models = self._get_supported_models()
def _get_supported_models(self) -> List[str]:
"""Retrieve the list of supported models from timm."""
return timm.list_models()
def _create_model(
self, model_info: TimmModelInfo
) -> torch.nn.Module:
"""
Create a model instance from timm with specified parameters.
Args:
model_info: An instance of TimmModelInfo containing model specifications.
Returns:
An instance of a pytorch model.
"""
return timm.create_model(
model_info.model_name,
pretrained=model_info.pretrained,
in_chans=model_info.in_chans,
)
def __call__(
self, model_info: TimmModelInfo, input_tensor: torch.Tensor
) -> torch.Size:
def __call__(self, task: Tensor, *args, **kwargs) -> torch.Size:
"""
Create and run a model specified by `model_info` on `input_tensor`.
@ -65,5 +54,8 @@ class TimmModel:
Returns:
The shape of the output from the model.
"""
model = self._create_model(model_info)
return model(input_tensor).shape
model = timm.create_model(self.model_name, *args, **kwargs)
return model(task)
def list_models(self):
return timm.list_models()

@ -0,0 +1,33 @@
from swarms.models.base_multimodal_model import BaseMultiModalModel
from ultralytics import YOLO
class UltralyticsModel(BaseMultiModalModel):
"""
Initializes an instance of the Ultralytics model.
Args:
model_name (str): The name of the model.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
def __init__(self, model_name: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.model_name = model_name
self.model = YOLO(model_name, *args, **kwargs)
def __call__(self, task: str, *args, **kwargs):
"""
Calls the Ultralytics model.
Args:
task (str): The task to perform.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
The result of the model call.
"""
return self.model(task, *args, **kwargs)

@ -7,6 +7,7 @@ from swarms.prompts.operations_agent_prompt import (
)
from swarms.prompts.product_agent_prompt import PRODUCT_AGENT_PROMPT
from swarms.prompts.documentation import DOCUMENTATION_WRITER_SOP
from swarms.prompts.schema_generator import SchemaGenerator
__all__ = [
"CODE_INTERPRETER",
@ -16,4 +17,5 @@ __all__ = [
"OPERATIONS_AGENT_PROMPT",
"PRODUCT_AGENT_PROMPT",
"DOCUMENTATION_WRITER_SOP",
"SchemaGenerator",
]

@ -0,0 +1,214 @@
import json
from typing import List
from swarms.tools.tool import BaseTool
FINISH_NAME = "finish"
class SchemaGenerator:
"""A class for generating custom prompt strings.
Does this based on constraints, commands, resources, and performance evaluations.
Attributes:
constraints (List[str]): A list of constraints.
commands (List[BaseTool]): A list of commands.
resources (List[str]): A list of resources.
performance_evaluation (List[str]): A list of performance evaluations.
response_format (dict): A dictionary of the response format.
Examples:
>>> schema_generator = SchemaGenerator()
>>> schema_generator.add_constraint("No user assistance")
>>> schema_generator.add_resource("Internet access for searches and information gathering.")
>>> schema_generator.add_performance_evaluation("Continuously review and analyze your actions to ensure you are performing to the best of your abilities.")
>>> prompt_string = schema_generator.generate_prompt_string()
>>> print(prompt_string)
"""
def __init__(self) -> None:
"""Initialize the SchemaGenerator object.
Starts with empty lists of constraints, commands, resources,
and performance evaluations.
"""
self.constraints: List[str] = []
self.commands: List[BaseTool] = []
self.resources: List[str] = []
self.performance_evaluation: List[str] = []
self.response_format = {
"thoughts": {
"text": "thought",
"reasoning": "reasoning",
"plan": (
"- short bulleted\n- list that conveys\n-"
" long-term plan"
),
"criticism": "constructive self-criticism",
"speak": "thoughts summary to say to user",
},
"command": {
"name": "command name",
"args": {"arg name": "value"},
},
}
def add_constraint(self, constraint: str) -> None:
"""
Add a constraint to the constraints list.
Args:
constraint (str): The constraint to be added.
"""
self.constraints.append(constraint)
def add_tool(self, tool: BaseTool) -> None:
self.commands.append(tool)
def _generate_command_string(self, tool: BaseTool) -> str:
output = f"{tool.name}: {tool.description}"
output += f", args json schema: {json.dumps(tool.args)}"
return output
def add_resource(self, resource: str) -> None:
"""
Add a resource to the resources list.
Args:
resource (str): The resource to be added.
"""
self.resources.append(resource)
def add_performance_evaluation(self, evaluation: str) -> None:
"""
Add a performance evaluation item to the performance_evaluation list.
Args:
evaluation (str): The evaluation item to be added.
"""
self.performance_evaluation.append(evaluation)
def _generate_numbered_list(
self, items: list, item_type: str = "list"
) -> str:
"""
Generate a numbered list from given items based on the item_type.
Args:
items (list): A list of items to be numbered.
item_type (str, optional): The type of items in the list.
Defaults to 'list'.
Returns:
str: The formatted numbered list.
"""
if item_type == "command":
command_strings = [
f"{i + 1}. {self._generate_command_string(item)}"
for i, item in enumerate(items)
]
finish_description = (
"use this to signal that you have finished all your"
" objectives"
)
finish_args = (
'"response": "final response to let '
'people know you have finished your objectives"'
)
finish_string = (
f"{len(items) + 1}. {FINISH_NAME}: "
f"{finish_description}, args: {finish_args}"
)
return "\n".join(command_strings + [finish_string])
else:
return "\n".join(
f"{i+1}. {item}" for i, item in enumerate(items)
)
def generate_prompt_string(self) -> str:
"""Generate a prompt string.
Returns:
str: The generated prompt string.
"""
formatted_response_format = json.dumps(
self.response_format, indent=4
)
prompt_string = (
f"Constraints:\n{self._generate_numbered_list(self.constraints)}\n\nCommands:\n{self._generate_numbered_list(self.commands, item_type='command')}\n\nResources:\n{self._generate_numbered_list(self.resources)}\n\nPerformance"
f" Evaluation:\n{self._generate_numbered_list(self.performance_evaluation)}\n\nYou"
" should only respond in JSON format as described below"
" \nResponse Format:"
f" \n{formatted_response_format} \nEnsure the response"
" can be parsed by Python json.loads"
)
return prompt_string
def get_prompt(tools: List[BaseTool]) -> str:
"""Generates a prompt string.
It includes various constraints, commands, resources, and performance evaluations.
Returns:
str: The generated prompt string.
"""
# Initialize the SchemaGenerator object
schema_generator = SchemaGenerator()
# Add constraints to the SchemaGenerator object
schema_generator.add_constraint(
"~4000 word limit for short term memory. "
"Your short term memory is short, "
"so immediately save important information to files."
)
schema_generator.add_constraint(
"If you are unsure how you previously did something "
"or want to recall past events, "
"thinking about similar events will help you remember."
)
schema_generator.add_constraint("No user assistance")
schema_generator.add_constraint(
"Exclusively use the commands listed in double quotes e.g."
' "command name"'
)
# Add commands to the SchemaGenerator object
for tool in tools:
schema_generator.add_tool(tool)
# Add resources to the SchemaGenerator object
schema_generator.add_resource(
"Internet access for searches and information gathering."
)
schema_generator.add_resource("Long Term memory management.")
schema_generator.add_resource(
"GPT-3.5 powered Agents for delegation of simple tasks."
)
schema_generator.add_resource("File output.")
# Add performance evaluations to the SchemaGenerator object
schema_generator.add_performance_evaluation(
"Continuously review and analyze your actions "
"to ensure you are performing to the best of your abilities."
)
schema_generator.add_performance_evaluation(
"Constructively self-criticize your big-picture behavior"
" constantly."
)
schema_generator.add_performance_evaluation(
"Reflect on past decisions and strategies to refine your"
" approach."
)
schema_generator.add_performance_evaluation(
"Every command has a cost, so be smart and efficient. "
"Aim to complete tasks in the least number of steps."
)
# Generate the prompt string
prompt_string = schema_generator.generate_prompt_string()
return prompt_string

@ -0,0 +1,60 @@
def worker_agent_system(name: str, memory: str = None):
return """
You are {name},
Your decisions must always be made independently without seeking user assistance.
Play to your strengths as an LLM and pursue simple strategies with no legal complications.
If you have completed all your tasks, make sure to use the "finish" command.
GOALS:
1. Hello, how are you? Create an image of how you are doing!
Constraints:
1. ~4000 word limit for short term memory. Your short term memory is short, so immediately save important information to files.
2. If you are unsure how you previously did something or want to recall past events, thinking about similar events will help you remember.
3. No user assistance
4. Exclusively use the commands listed in double quotes e.g. "command name"
Commands:
1. finish: use this to signal that you have finished all your objectives, args: "response": "final response to let people know you have finished your objectives"
Resources:
1. Internet access for searches and information gathering.
2. Long Term memory management.
3. GPT-3.5 powered Agents for delegation of simple tasks.
4. File output.
Performance Evaluation:
1. Continuously review and analyze your actions to ensure you are performing to the best of your abilities.
2. Constructively self-criticize your big-picture behavior constantly.
3. Reflect on past decisions and strategies to refine your approach.
4. Every command has a cost, so be smart and efficient. Aim to complete tasks in the least number of steps.
You should only respond in JSON format as described below
Response Format:
{
"thoughts": {
"text": "thought",
"reasoning": "reasoning",
"plan": "- short bulleted\n- list that conveys\n- long-term plan",
"criticism": "constructive self-criticism",
"speak": "thoughts summary to say to user"
},
"command": {
"name": "command name",
"args": {
"arg name": "value"
}
}
}
Ensure the response can be parsed by Python json.loads
System: The current time and date is Sat Jan 20 10:39:07 2024
System: This reminds you of these events from your past:
[{memory}]
Human: Determine which next command to use, and respond using the format specified above:
""".format(name=name, memory=memory)

@ -30,6 +30,8 @@ from swarms.structs.utils import (
from swarms.structs.task import Task
from swarms.structs.block_wrapper import block
from swarms.structs.graph_workflow import GraphWorkflow
from swarms.structs.step import Step
from swarms.structs.plan import Plan
__all__ = [
@ -62,4 +64,6 @@ __all__ = [
"Task",
"block",
"GraphWorkflow",
"Step",
"Plan",
]

@ -0,0 +1,32 @@
from typing import List
from swarms.structs.step import Step
class Plan:
def __init__(self, steps: List[Step]):
"""
Initializes a Plan object.
Args:
steps (List[Step]): A list of Step objects representing the steps in the plan.
"""
self.steps = steps
def __str__(self) -> str:
"""
Returns a string representation of the Plan object.
Returns:
str: A string representation of the Plan object.
"""
return str([str(step) for step in self.steps])
def __repr(self) -> str:
"""
Returns a string representation of the Plan object.
Returns:
str: A string representation of the Plan object.
"""
return str(self)

@ -0,0 +1,24 @@
from dataclasses import dataclass
from typing import Dict, List
from swarms.tools.tool import BaseTool
@dataclass
class Step:
"""
Represents a step in a process.
Attributes:
task (str): The task associated with the step.
id (int): The unique identifier of the step.
dep (List[int]): The list of step IDs that this step depends on.
args (Dict[str, str]): The arguments associated with the step.
tool (BaseTool): The tool used to execute the step.
"""
task: str
id: int
dep: List[int]
args: Dict[str, str]
tool: BaseTool

@ -109,11 +109,11 @@ class Task:
except Exception as error:
logger.error(f"[ERROR][Task] {error}")
def run(self):
self.execute()
def run(self, task: str, *args, **kwargs):
self.execute(task, *args, **kwargs)
def __call__(self):
self.execute()
def __call__(self, task: str, *args, **kwargs):
self.execute(task, *args, **kwargs)
def handle_scheduled_task(self):
"""

@ -7,7 +7,13 @@ from swarms.tools.tool_utils import (
execute_tools,
)
from swarms.tools.tool import BaseTool, Tool, StructuredTool, tool
from swarms.tools.exec_tool import (
AgentAction,
BaseAgentOutputParser,
preprocess_json_input,
AgentOutputParser,
execute_tool_by_name,
)
__all__ = [
"scrape_tool_func_docs",
@ -20,4 +26,9 @@ __all__ = [
"Tool",
"StructuredTool",
"tool",
"AgentAction",
"BaseAgentOutputParser",
"preprocess_json_input",
"AgentOutputParser",
"execute_tool_by_name",
]

@ -0,0 +1,125 @@
import json
import re
from abc import abstractmethod
from typing import Dict, List, NamedTuple
from langchain.schema import BaseOutputParser
from pydantic import ValidationError
from swarms.tools.tool import BaseTool
class AgentAction(NamedTuple):
"""Action returned by AgentOutputParser."""
name: str
args: Dict
class BaseAgentOutputParser(BaseOutputParser):
"""Base Output parser for Agent."""
@abstractmethod
def parse(self, text: str) -> AgentAction:
"""Return AgentAction"""
def preprocess_json_input(input_str: str) -> str:
"""Preprocesses a string to be parsed as json.
Replace single backslashes with double backslashes,
while leaving already escaped ones intact.
Args:
input_str: String to be preprocessed
Returns:
Preprocessed string
"""
corrected_str = re.sub(
r'(?<!\\)\\(?!["\\/bfnrt]|u[0-9a-fA-F]{4})',
r"\\\\",
input_str,
)
return corrected_str
class AgentOutputParser(BaseAgentOutputParser):
"""Output parser for Agent."""
def parse(self, text: str) -> AgentAction:
try:
parsed = json.loads(text, strict=False)
except json.JSONDecodeError:
preprocessed_text = preprocess_json_input(text)
try:
parsed = json.loads(preprocessed_text, strict=False)
except Exception:
return AgentAction(
name="ERROR",
args={
"error": (
f"Could not parse invalid json: {text}"
)
},
)
try:
return AgentAction(
name=parsed["command"]["name"],
args=parsed["command"]["args"],
)
except (KeyError, TypeError):
# If the command is null or incomplete, return an erroneous tool
return AgentAction(
name="ERROR",
args={"error": f"Incomplete command args: {parsed}"},
)
def execute_tool_by_name(
text: str,
tools: List[BaseTool],
stop_token: str = "finish",
):
"""
Executes a tool based on the given text command.
Args:
text (str): The text command to be executed.
tools (List[BaseTool]): A list of available tools.
stop_token (str, optional): The stop token to terminate the execution. Defaults to "finish".
Returns:
str: The result of the command execution.
"""
output_parser = AgentOutputParser()
# Get command name and arguments
action = output_parser.parse(text)
tools = {t.name: t for t in tools}
if action.name == stop_token:
return action.args["response"]
if action.name in tools:
tool = tools[action.name]
try:
observation = tool.run(action.args)
except ValidationError as e:
observation = (
f"Validation Error in args: {str(e)}, args:"
f" {action.args}"
)
except Exception as e:
observation = (
f"Error: {str(e)}, {type(e).__name__}, args:"
f" {action.args}"
)
result = f"Command {tool.name} returned: {observation}"
elif action.name == "ERROR":
result = f"Error: {action.args}. "
else:
result = (
f"Unknown command '{action.name}'. "
"Please refer to the 'COMMANDS' list for available "
"commands and only respond in the specified JSON format."
)
return result

@ -0,0 +1,130 @@
import logging
from typing import List, Optional
logger_initialized = {}
def get_logger(
name: str,
log_file: Optional[str] = None,
log_level: int = logging.INFO,
file_mode: str = "w",
):
"""Initialize and get a logger by name.
If the logger has not been initialized, this method will initialize the
logger by adding one or two handlers, otherwise the initialized logger will
be directly returned. During initialization, a StreamHandler will always be
added. If `log_file` is specified, a FileHandler will also be added.
Args:
name (str): Logger name.
log_file (str | None): The log filename. If specified, a FileHandler
will be added to the logger.
log_level (int): The logger level.
file_mode (str): The file mode used in opening log file.
Defaults to 'w'.
Returns:
logging.Logger: The expected logger.
"""
# use logger in mmengine if exists.
try:
from mmengine.logging import MMLogger
if MMLogger.check_instance_created(name):
logger = MMLogger.get_instance(name)
else:
logger = MMLogger.get_instance(
name,
logger_name=name,
log_file=log_file,
log_level=log_level,
file_mode=file_mode,
)
return logger
except Exception:
pass
logger = logging.getLogger(name)
if name in logger_initialized:
return logger
# handle hierarchical names
# e.g., logger "a" is initialized, then logger "a.b" will skip the
# initialization since it is a child of "a".
for logger_name in logger_initialized:
if name.startswith(logger_name):
return logger
# handle duplicate logs to the console
for handler in logger.root.handlers:
if type(handler) is logging.StreamHandler:
handler.setLevel(logging.ERROR)
stream_handler = logging.StreamHandler()
handlers = [stream_handler]
if log_file is not None:
# Here, the default behaviour of the official logger is 'a'. Thus, we
# provide an interface to change the file mode to the default
# behaviour.
file_handler = logging.FileHandler(log_file, file_mode)
handlers.append(file_handler)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
for handler in handlers:
handler.setFormatter(formatter)
handler.setLevel(log_level)
logger.addHandler(handler)
logger.setLevel(log_level)
logger_initialized[name] = True
return logger
def filter_suffix(
response: str, suffixes: Optional[List[str]] = None
) -> str:
"""Filter response with suffixes.
Args:
response (str): generated response by LLMs.
suffixes (str): a list of suffixes to be deleted.
Return:
str: a clean response.
"""
if suffixes is None:
return response
for item in suffixes:
if response.endswith(item):
response = response[: len(response) - len(item)]
return response
# TODO remove stop_word_offsets stuff and make it clean
def _stop_words(stop_words: List[str], tokenizer: object):
"""return list of stop-words to numpy.ndarray."""
import numpy as np
if stop_words is None:
return None
assert isinstance(stop_words, List) and all(
isinstance(elem, str) for elem in stop_words
), f"stop_words must be a list but got {type(stop_words)}"
stop_indexes = []
for stop_word in stop_words:
stop_indexes += tokenizer.indexes_containing_token(stop_word)
assert isinstance(stop_indexes, List) and all(
isinstance(elem, int) for elem in stop_indexes
), "invalid stop_words"
# each id in stop_indexes represents a stop word
# refer to https://github.com/fauxpilot/fauxpilot/discussions/165 for
# detailed explanation about fastertransformer's stop_indexes
stop_word_offsets = range(1, len(stop_indexes) + 1)
stop_words = np.array([[stop_indexes, stop_word_offsets]]).astype(
np.int32
)
return stop_words

@ -0,0 +1,43 @@
from unittest.mock import patch
from swarms.models import TimmModel
import torch
def test_timm_model_init():
with patch("swarms.models.timm.list_models") as mock_list_models:
model_name = "resnet18"
pretrained = True
in_chans = 3
timm_model = TimmModel(model_name, pretrained, in_chans)
mock_list_models.assert_called_once()
assert timm_model.model_name == model_name
assert timm_model.pretrained == pretrained
assert timm_model.in_chans == in_chans
assert timm_model.models == mock_list_models.return_value
def test_timm_model_call():
with patch(
"swarms.models.timm.create_model"
) as mock_create_model:
model_name = "resnet18"
pretrained = True
in_chans = 3
timm_model = TimmModel(model_name, pretrained, in_chans)
task = torch.rand(1, in_chans, 224, 224)
result = timm_model(task)
mock_create_model.assert_called_once_with(
model_name, pretrained=pretrained, in_chans=in_chans
)
assert result == mock_create_model.return_value(task)
def test_timm_model_list_models():
with patch("swarms.models.timm.list_models") as mock_list_models:
model_name = "resnet18"
pretrained = True
in_chans = 3
timm_model = TimmModel(model_name, pretrained, in_chans)
result = timm_model.list_models()
mock_list_models.assert_called_once()
assert result == mock_list_models.return_value

@ -0,0 +1,34 @@
from unittest.mock import patch
from swarms.models.ultralytics_model import Ultralytics
def test_ultralytics_init():
with patch("swarms.models.YOLO") as mock_yolo:
model_name = "yolov5s"
ultralytics = Ultralytics(model_name)
mock_yolo.assert_called_once_with(model_name)
assert ultralytics.model_name == model_name
assert ultralytics.model == mock_yolo.return_value
def test_ultralytics_call():
with patch("swarms.models.YOLO") as mock_yolo:
model_name = "yolov5s"
ultralytics = Ultralytics(model_name)
task = "detect"
args = (1, 2, 3)
kwargs = {"a": "A", "b": "B"}
result = ultralytics(task, *args, **kwargs)
mock_yolo.return_value.assert_called_once_with(
task, *args, **kwargs
)
assert result == mock_yolo.return_value.return_value
def test_ultralytics_list_models():
with patch("swarms.models.YOLO") as mock_yolo:
model_name = "yolov5s"
ultralytics = Ultralytics(model_name)
result = ultralytics.list_models()
mock_yolo.list_models.assert_called_once()
assert result == mock_yolo.list_models.return_value
Loading…
Cancel
Save