[EXAMPLES CLEANUP] [FIX][SwarmMatcher] [Remove old examples]

pull/1051/head
Kye Gomez 1 week ago
parent e8f161beea
commit adb6930439

@ -1,311 +0,0 @@
import torch
from torch import Tensor
from loguru import logger
from typing import Tuple
import matplotlib.pyplot as plt
try:
# ipywidgets is available in interactive environments like Jupyter.
from ipywidgets import interact, IntSlider
HAS_IPYWIDGETS = True
except ImportError:
HAS_IPYWIDGETS = False
logger.warning(
"ipywidgets not installed. Interactive slicing will be disabled."
)
class GaussianSplat4DStateSpace:
"""
4D Gaussian splatting with a state space model in PyTorch.
Each Gaussian is defined by an 8D state vector:
[x, y, z, w, vx, vy, vz, vw],
where the first four dimensions are the spatial coordinates and the last
four are the velocities. Only the spatial (first four) dimensions are used
for the 4D Gaussian splat, with a corresponding 4×4 covariance matrix.
Attributes:
num_gaussians (int): Number of Gaussians.
state_dim (int): Dimension of the state vector (should be 8).
states (Tensor): Current state for each Gaussian of shape (num_gaussians, state_dim).
covariances (Tensor): Covariance matrices for the spatial dimensions, shape (num_gaussians, 4, 4).
A (Tensor): State transition matrix of shape (state_dim, state_dim).
dt (float): Time step for state updates.
"""
def __init__(
self,
num_gaussians: int,
init_states: Tensor,
init_covariances: Tensor,
dt: float = 1.0,
) -> None:
"""
Initialize the 4D Gaussian splat model.
Args:
num_gaussians (int): Number of Gaussians.
init_states (Tensor): Initial states of shape (num_gaussians, 8).
Each state is assumed to be
[x, y, z, w, vx, vy, vz, vw].
init_covariances (Tensor): Initial covariance matrices for the spatial dimensions,
shape (num_gaussians, 4, 4).
dt (float): Time step for the state update.
"""
if init_states.shape[1] != 8:
raise ValueError(
"init_states should have shape (N, 8) where 8 = 4 position + 4 velocity."
)
if init_covariances.shape[1:] != (4, 4):
raise ValueError(
"init_covariances should have shape (N, 4, 4)."
)
self.num_gaussians = num_gaussians
self.states = init_states.clone() # shape: (N, 8)
self.covariances = (
init_covariances.clone()
) # shape: (N, 4, 4)
self.dt = dt
self.state_dim = init_states.shape[1]
# Create an 8x8 constant-velocity state transition matrix:
# New position = position + velocity*dt, velocity remains unchanged.
I4 = torch.eye(
4, dtype=init_states.dtype, device=init_states.device
)
zeros4 = torch.zeros(
(4, 4), dtype=init_states.dtype, device=init_states.device
)
top = torch.cat([I4, dt * I4], dim=1)
bottom = torch.cat([zeros4, I4], dim=1)
self.A = torch.cat([top, bottom], dim=0) # shape: (8, 8)
logger.info(
"Initialized 4D GaussianSplatStateSpace with {} Gaussians.",
num_gaussians,
)
def update_states(self) -> None:
"""
Update the state of each Gaussian using the constant-velocity state space model.
Applies:
state_next = A @ state_current.
"""
self.states = (
self.A @ self.states.t()
).t() # shape: (num_gaussians, 8)
logger.debug("States updated: {}", self.states)
def _compute_gaussian(
self, pos: Tensor, cov: Tensor, coords: Tensor
) -> Tensor:
"""
Compute the 4D Gaussian function over a grid of coordinates.
Args:
pos (Tensor): The center of the Gaussian (4,).
cov (Tensor): The 4×4 covariance matrix.
coords (Tensor): A grid of coordinates of shape (..., 4).
Returns:
Tensor: Evaluated Gaussian values on the grid with shape equal to coords.shape[:-1].
"""
try:
cov_inv = torch.linalg.inv(cov)
except RuntimeError as e:
logger.warning(
"Covariance inversion failed; using pseudo-inverse. Error: {}",
e,
)
cov_inv = torch.linalg.pinv(cov)
# Broadcast pos over the grid
diff = coords - pos.view(
*(1 for _ in range(coords.ndim - 1)), 4
)
mahal = torch.einsum("...i,ij,...j->...", diff, cov_inv, diff)
gaussian = torch.exp(-0.5 * mahal)
return gaussian
def render(
self,
canvas_size: Tuple[int, int, int, int],
sigma_scale: float = 1.0,
normalize: bool = False,
) -> Tensor:
"""
Render the current 4D Gaussian splats onto a 4D canvas.
Args:
canvas_size (Tuple[int, int, int, int]): The size of the canvas (d1, d2, d3, d4).
sigma_scale (float): Scaling factor for the covariance (affects spread).
normalize (bool): Whether to normalize the final canvas to [0, 1].
Returns:
Tensor: A 4D tensor (canvas) with the accumulated contributions from all Gaussians.
"""
d1, d2, d3, d4 = canvas_size
# Create coordinate grids for each dimension.
grid1 = torch.linspace(
0, d1 - 1, d1, device=self.states.device
)
grid2 = torch.linspace(
0, d2 - 1, d2, device=self.states.device
)
grid3 = torch.linspace(
0, d3 - 1, d3, device=self.states.device
)
grid4 = torch.linspace(
0, d4 - 1, d4, device=self.states.device
)
# Create a 4D meshgrid (using indexing "ij")
grid = torch.stack(
torch.meshgrid(grid1, grid2, grid3, grid4, indexing="ij"),
dim=-1,
) # shape: (d1, d2, d3, d4, 4)
# Initialize the canvas.
canvas = torch.zeros(
(d1, d2, d3, d4),
dtype=self.states.dtype,
device=self.states.device,
)
for i in range(self.num_gaussians):
pos = self.states[i, :4] # spatial center (4,)
cov = (
self.covariances[i] * sigma_scale
) # scaled covariance
gaussian = self._compute_gaussian(pos, cov, grid)
canvas += gaussian
logger.debug(
"Rendered Gaussian {} at position {}", i, pos.tolist()
)
if normalize:
max_val = canvas.max()
if max_val > 0:
canvas = canvas / max_val
logger.debug("Canvas normalized.")
logger.info("4D Rendering complete.")
return canvas
def interactive_slice(canvas: Tensor) -> None:
"""
Display an interactive 2D slice of the 4D canvas using ipywidgets.
This function fixes two of the four dimensions (d3 and d4) via sliders and
displays the resulting 2D slice (over dimensions d1 and d2).
Args:
canvas (Tensor): A 4D tensor with shape (d1, d2, d3, d4).
"""
d1, d2, d3, d4 = canvas.shape
def display_slice(slice_d3: int, slice_d4: int):
slice_2d = canvas[:, :, slice_d3, slice_d4].cpu().numpy()
plt.figure(figsize=(6, 6))
plt.imshow(slice_2d, cmap="hot", origin="lower")
plt.title(f"2D Slice at d3={slice_d3}, d4={slice_d4}")
plt.colorbar()
plt.show()
interact(
display_slice,
slice_d3=IntSlider(min=0, max=d3 - 1, step=1, value=d3 // 2),
slice_d4=IntSlider(min=0, max=d4 - 1, step=1, value=d4 // 2),
)
def mip_projection(canvas: Tensor) -> None:
"""
Render a 2D view of the 4D canvas using maximum intensity projection (MIP)
along the 3rd and 4th dimensions.
Args:
canvas (Tensor): A 4D tensor with shape (d1, d2, d3, d4).
"""
# MIP along dimension 3
mip_3d = canvas.max(dim=2)[0] # shape: (d1, d2, d4)
# MIP along dimension 4
mip_2d = mip_3d.max(dim=2)[0] # shape: (d1, d2)
plt.figure(figsize=(6, 6))
plt.imshow(mip_2d.cpu().numpy(), cmap="hot", origin="lower")
plt.title("2D MIP (Projecting dimensions d3 and d4)")
plt.colorbar()
plt.show()
def main() -> None:
"""
Main function that:
- Creates a 4D Gaussian splat model.
- Updates the states to simulate motion.
- Renders a 4D canvas.
- Visualizes the 4D volume via interactive slicing (if available) or MIP.
"""
torch.manual_seed(42)
num_gaussians = 2
# Define initial states for each Gaussian:
# Each state is [x, y, z, w, vx, vy, vz, vw].
init_states = torch.tensor(
[
[10.0, 15.0, 20.0, 25.0, 0.5, -0.2, 0.3, 0.1],
[30.0, 35.0, 40.0, 45.0, -0.3, 0.4, -0.1, 0.2],
],
dtype=torch.float32,
)
# Define initial 4x4 covariance matrices for the spatial dimensions.
init_covariances = torch.stack(
[
torch.diag(
torch.tensor(
[5.0, 5.0, 5.0, 5.0], dtype=torch.float32
)
),
torch.diag(
torch.tensor(
[3.0, 3.0, 3.0, 3.0], dtype=torch.float32
)
),
]
)
# Create the 4D Gaussian splat model.
model = GaussianSplat4DStateSpace(
num_gaussians, init_states, init_covariances, dt=1.0
)
# Update states to simulate one time step.
model.update_states()
# Render the 4D canvas.
canvas_size = (20, 20, 20, 20)
canvas = model.render(
canvas_size, sigma_scale=1.0, normalize=True
)
# Visualize the 4D data.
if HAS_IPYWIDGETS:
logger.info("Launching interactive slicing tool for 4D data.")
interactive_slice(canvas)
else:
logger.info(
"ipywidgets not available; using maximum intensity projection instead."
)
mip_projection(canvas)
if __name__ == "__main__":
main()

@ -1,46 +0,0 @@
from swarms.utils.vllm_wrapper import VLLMWrapper
def main():
# Initialize the vLLM wrapper with a model
# Note: You'll need to have the model downloaded or specify a HuggingFace model ID
llm = VLLMWrapper(
model_name="meta-llama/Llama-2-7b-chat-hf", # Replace with your model path or HF model ID
temperature=0.7,
max_tokens=1000,
)
# Example task
task = "What are the benefits of using vLLM for inference?"
# Run inference
response = llm.run(task)
print("Response:", response)
# Example with system prompt
llm_with_system = VLLMWrapper(
model_name="meta-llama/Llama-2-7b-chat-hf", # Replace with your model path or HF model ID
system_prompt="You are a helpful AI assistant that provides concise answers.",
temperature=0.7,
)
# Run inference with system prompt
response = llm_with_system.run(task)
print("\nResponse with system prompt:", response)
# Example with batched inference
tasks = [
"What is vLLM?",
"How does vLLM improve inference speed?",
"What are the main features of vLLM?",
]
responses = llm.batched_run(tasks, batch_size=2)
print("\nBatched responses:")
for task, response in zip(tasks, responses):
print(f"\nTask: {task}")
print(f"Response: {response}")
if __name__ == "__main__":
main()

@ -1,148 +0,0 @@
import concurrent.futures
import os
from typing import Any
from loguru import logger
try:
from vllm import LLM, SamplingParams
except ImportError:
import subprocess
import sys
print("Installing vllm")
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "-U", "vllm"]
)
print("vllm installed")
from vllm import LLM, SamplingParams
class VLLMWrapper:
"""
A wrapper class for vLLM that provides a similar interface to LiteLLM.
This class handles model initialization and inference using vLLM.
"""
def __init__(
self,
model_name: str = "meta-llama/Llama-2-7b-chat-hf",
system_prompt: str | None = None,
stream: bool = False,
temperature: float = 0.5,
max_tokens: int = 4000,
max_completion_tokens: int = 4000,
tools_list_dictionary: list[dict[str, Any]] | None = None,
tool_choice: str = "auto",
parallel_tool_calls: bool = False,
*args,
**kwargs,
):
"""
Initialize the vLLM wrapper with the given parameters.
Args:
model_name (str): The name of the model to use. Defaults to "meta-llama/Llama-2-7b-chat-hf".
system_prompt (str, optional): The system prompt to use. Defaults to None.
stream (bool): Whether to stream the output. Defaults to False.
temperature (float): The temperature for sampling. Defaults to 0.5.
max_tokens (int): The maximum number of tokens to generate. Defaults to 4000.
max_completion_tokens (int): The maximum number of completion tokens. Defaults to 4000.
tools_list_dictionary (List[Dict[str, Any]], optional): List of available tools. Defaults to None.
tool_choice (str): How to choose tools. Defaults to "auto".
parallel_tool_calls (bool): Whether to allow parallel tool calls. Defaults to False.
"""
self.model_name = model_name
self.system_prompt = system_prompt
self.stream = stream
self.temperature = temperature
self.max_tokens = max_tokens
self.max_completion_tokens = max_completion_tokens
self.tools_list_dictionary = tools_list_dictionary
self.tool_choice = tool_choice
self.parallel_tool_calls = parallel_tool_calls
# Initialize vLLM
self.llm = LLM(model=model_name, **kwargs)
self.sampling_params = SamplingParams(
temperature=temperature,
max_tokens=max_tokens,
)
def _prepare_prompt(self, task: str) -> str:
"""
Prepare the prompt for the given task.
Args:
task (str): The task to prepare the prompt for.
Returns:
str: The prepared prompt.
"""
if self.system_prompt:
return f"{self.system_prompt}\n\nUser: {task}\nAssistant:"
return f"User: {task}\nAssistant:"
def run(self, task: str, *args, **kwargs) -> str:
"""
Run the model for the given task.
Args:
task (str): The task to run the model for.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
str: The model's response.
"""
try:
prompt = self._prepare_prompt(task)
outputs = self.llm.generate(prompt, self.sampling_params)
response = outputs[0].outputs[0].text.strip()
return response
except Exception as error:
logger.error(f"Error in VLLMWrapper: {error}")
raise error
def __call__(self, task: str, *args, **kwargs) -> str:
"""
Call the model for the given task.
Args:
task (str): The task to run the model for.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
str: The model's response.
"""
return self.run(task, *args, **kwargs)
def batched_run(
self, tasks: list[str], batch_size: int = 10
) -> list[str]:
"""
Run the model for multiple tasks in batches.
Args:
tasks (List[str]): List of tasks to run.
batch_size (int): Size of each batch. Defaults to 10.
Returns:
List[str]: List of model responses.
"""
# Calculate the worker count based on 95% of available CPU cores
num_workers = max(1, int((os.cpu_count() or 1) * 0.95))
with concurrent.futures.ThreadPoolExecutor(
max_workers=num_workers
) as executor:
futures = [
executor.submit(self.run, task) for task in tasks
]
return [
future.result()
for future in concurrent.futures.as_completed(futures)
]

@ -7,15 +7,6 @@ of the Talk Structurally, Act Hierarchically framework.
All components are now in one file: hierarchical_structured_communication_framework.py
"""
import os
import sys
# Add the project root to the Python path
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..")
)
sys.path.insert(0, project_root)
from dotenv import load_dotenv
# Import everything from the single file

@ -11,9 +11,6 @@ import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union, Any
import warnings
warnings.filterwarnings("ignore")
import cv2
import numpy as np
import torch
@ -22,6 +19,10 @@ from PIL import Image
import open3d as o3d
from loguru import logger
warnings.filterwarnings("ignore")
# Third-party model imports
try:
import timm

@ -1,5 +1,6 @@
from swarms.structs.agent import Agent
from swarms.structs.agent_builder import AgentsBuilder
from swarms.structs.agent_loader import AgentLoader
from swarms.structs.agent_rearrange import AgentRearrange, rearrange
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
from swarms.structs.base_structure import BaseStructure
@ -103,7 +104,6 @@ from swarms.structs.swarming_architectures import (
staircase_swarm,
star_swarm,
)
from swarms.structs.agent_loader import AgentLoader
__all__ = [
"Agent",

@ -1026,16 +1026,16 @@ class Agent:
self.short_memory.add(
role="system",
content=(
f"🔍 [RAG Query Initiated]\n"
f"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
f"📝 Query:\n{query}\n\n"
f"📚 Retrieved Knowledge (RAG Output):\n{output}\n"
f"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
f"💡 The above information was retrieved from the agent's long-term memory using Retrieval-Augmented Generation (RAG). "
f"Use this context to inform your next response or reasoning step."
"[RAG Query Initiated]\n"
"----------------------------------\n"
f"Query:\n{query}\n\n"
f"Retrieved Knowledge (RAG Output):\n{output}\n"
"----------------------------------\n"
"The above information was retrieved from the agent's long-term memory using Retrieval-Augmented Generation (RAG). "
"Use this context to inform your next response or reasoning step."
),
)
except Exception as e:
except AgentMemoryError as e:
logger.error(
f"Agent: {self.agent_name} Error handling RAG query: {e} Traceback: {traceback.format_exc()}"
)

@ -1,11 +1,9 @@
import json
import os
from typing import Dict, List, Literal, Optional, Tuple, Union
from typing import List, Optional, Tuple
import numpy as np
from pydantic import BaseModel, Field, field_validator
from pydantic.v1 import validator
from litellm import embedding
from pydantic import BaseModel, Field
from tenacity import retry, stop_after_attempt, wait_exponential
from swarms.utils.auto_download_check_packages import (
auto_check_and_download_package,
@ -23,75 +21,18 @@ class SwarmType(BaseModel):
)
api_key = os.getenv("OPENAI_API_KEY")
class SwarmMatcherConfig(BaseModel):
backend: Literal["local", "openai"] = "local"
model_name: str = (
"sentence-transformers/all-MiniLM-L6-v2" # For local embeddings
)
openai_model: str = (
"text-embedding-3-small" # Default to newer OpenAI model
model_name: str = "sentence-transformers/all-MiniLM-L6-v2"
embedding_dim: int = (
512 # Dimension of the sentence-transformers model
)
embedding_dim: int = 512 # For local embeddings
openai_dimensions: Optional[int] = (
None # For OpenAI text-embedding-3-* models
)
similarity_threshold: float = Field(default=0.5, ge=0.0, le=1.0)
cache_embeddings: bool = True
max_sequence_length: int = Field(default=512, ge=64, le=2048)
device: str = "cpu" # Only used for local embeddings
batch_size: int = Field(default=32, ge=1)
openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY")
metadata: Optional[Dict] = Field(
default_factory=dict
) # For OpenAI embedding calls
class Config:
validate_assignment = True
@validator("openai_dimensions")
def validate_dimensions(cls, v, values):
if values.get("backend") == "openai":
if (
values.get("openai_model", "").startswith(
"text-embedding-3"
)
and v is None
):
# Default to 1536 for text-embedding-3-small/large if not specified
return 1536
return v
@field_validator("openai_model")
def validate_model(cls, v, values):
if values.get("backend") == "openai":
valid_models = [
"text-embedding-3-small",
"text-embedding-3-large",
"text-embedding-ada-002",
]
if v not in valid_models:
raise ValueError(
f"OpenAI model must be one of: {', '.join(valid_models)}"
)
return v
class SwarmMatcher:
"""
A class for matching tasks to swarm types based on their descriptions using semantic similarity.
This class uses transformer models to generate embeddings for both task descriptions and swarm type descriptions.
It then calculates similarity scores to find the most appropriate swarm type for a given task.
Features:
- Supports both local transformer models and OpenAI embeddings
- Implements embedding caching for improved performance
- Provides batch processing capabilities
- Includes retry mechanisms for API calls
- Supports saving/loading swarm type configurations
A class for matching tasks to swarm types based on their descriptions.
It utilizes a transformer model to generate embeddings for task and swarm type descriptions,
and then calculates the dot product to find the best match.
"""
def __init__(self, config: SwarmMatcherConfig):
@ -99,35 +40,17 @@ class SwarmMatcher:
Initializes the SwarmMatcher with a configuration.
Args:
config (SwarmMatcherConfig): Configuration object specifying model settings,
similarity thresholds, and other parameters.
Raises:
ImportError: If required dependencies (torch, transformers) are not available
Exception: If model initialization fails
config (SwarmMatcherConfig): The configuration for the SwarmMatcher.
"""
try:
self.config = config
if self.config.backend == "local":
transformers = self._setup_dependencies()
self._setup_model_and_tokenizer(transformers)
self._initialize_state()
self.initialize_swarm_types()
logger.debug("SwarmMatcher initialized successfully")
except Exception as e:
logger.error(f"Error initializing SwarmMatcher: {str(e)}")
raise
logger.add("swarm_matcher_debug.log", level="DEBUG")
logger.debug("Initializing SwarmMatcher")
def _setup_dependencies(self):
"""Set up required dependencies for the SwarmMatcher."""
try:
import numpy as np
import torch
except ImportError:
auto_check_and_download_package(
"torch", package_manager="pip", upgrade=True
)
import numpy as np
import torch
try:
@ -139,279 +62,64 @@ class SwarmMatcher:
import transformers
self.torch = torch
self.np = np
return transformers
def _setup_model_and_tokenizer(self, transformers):
"""Initialize the model and tokenizer."""
self.device = self.torch.device(self.config.device)
self.tokenizer = transformers.AutoTokenizer.from_pretrained(
self.config.model_name
)
self.model = transformers.AutoModel.from_pretrained(
self.config.model_name
).to(self.device)
def _initialize_state(self):
"""Initialize internal state variables."""
self.swarm_types: List[SwarmType] = []
self._embedding_cache = (
{} if self.config.cache_embeddings else None
)
def _get_cached_embedding(
self, text: str
) -> Optional[np.ndarray]:
"""
Retrieves a cached embedding if available.
Args:
text (str): The text to look up in the cache
Returns:
Optional[np.ndarray]: The cached embedding if found, None otherwise
"""
if self._embedding_cache is not None:
return self._embedding_cache.get(text)
return None
def _cache_embedding(self, text: str, embedding: np.ndarray):
"""
Stores an embedding in the cache for future use.
Args:
text (str): The text associated with the embedding
embedding (np.ndarray): The embedding vector to cache
"""
if self._embedding_cache is not None:
self._embedding_cache[text] = embedding
def _get_openai_embedding(self, text: str) -> np.ndarray:
"""Get embedding using OpenAI's API via litellm."""
try:
params = {
"model": self.config.openai_model,
"input": [text],
}
# Add dimensions parameter for text-embedding-3-* models
if (
self.config.openai_model.startswith(
"text-embedding-3"
self.config = config
self.tokenizer = (
transformers.AutoTokenizer.from_pretrained(
config.model_name
)
and self.config.openai_dimensions
):
params["dimensions"] = self.config.openai_dimensions
response = embedding(**params)
response = response.model_dump()
# Handle the response format
if "data" in response and len(response["data"]) > 0:
embedding_data = response["data"][0]["embedding"]
else:
raise ValueError(
f"Unexpected response format from OpenAI API: {response}"
)
embedding_array = np.array(embedding_data)
# Log usage information if available
if "usage" in response:
logger.debug(
f"OpenAI API usage - Prompt tokens: {response['usage'].get('prompt_tokens', 'N/A')}, "
f"Total tokens: {response['usage'].get('total_tokens', 'N/A')}"
self.model = transformers.AutoModel.from_pretrained(
config.model_name
)
return embedding_array
self.swarm_types: List[SwarmType] = []
logger.debug("SwarmMatcher initialized successfully")
except Exception as e:
logger.error(f"Error getting OpenAI embedding: {str(e)}")
logger.error(f"Error initializing SwarmMatcher: {str(e)}")
raise
def _get_openai_embeddings_batch(
self, texts: List[str]
) -> np.ndarray:
"""Get embeddings for a batch of texts using OpenAI's API via litellm."""
try:
params = {
"model": self.config.openai_model,
"input": texts,
}
# Add dimensions parameter for text-embedding-3-* models
if (
self.config.openai_model.startswith(
"text-embedding-3"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
and self.config.openai_dimensions
):
params["dimensions"] = self.config.openai_dimensions
response = embedding(**params)
response = response.model_dump()
# Handle the response format
if "data" in response:
embeddings = [
data["embedding"] for data in response["data"]
]
else:
raise ValueError(
f"Unexpected response format from OpenAI API: {response}"
)
# Log usage information if available
if "usage" in response:
logger.debug(
f"Batch OpenAI API usage - Prompt tokens: {response['usage'].get('prompt_tokens', 'N/A')}, "
f"Total tokens: {response['usage'].get('total_tokens', 'N/A')}"
)
return np.array(embeddings)
except Exception as e:
logger.error(
f"Error getting OpenAI embeddings batch: {str(e)}"
)
raise
def get_embedding(self, text: str) -> np.ndarray:
"""
Generates an embedding for a given text using the configured model.
This method first checks the cache for an existing embedding. If not found,
it generates a new embedding using either the local transformer model or OpenAI API.
Args:
text (str): The text for which to generate an embedding
text (str): The text for which to generate an embedding.
Returns:
np.ndarray: The embedding vector for the text
Raises:
Exception: If embedding generation fails
np.ndarray: The embedding vector for the text.
"""
# Check cache first
cached_embedding = self._get_cached_embedding(text)
if cached_embedding is not None:
return cached_embedding
logger.debug(f"Getting embedding for text: {text[:50]}...")
try:
if self.config.backend == "openai":
embedding = self._get_openai_embedding(text)
else:
inputs = self.tokenizer(
text,
return_tensors="pt",
padding=True,
truncation=True,
max_length=self.config.max_sequence_length,
max_length=512,
)
# Move inputs to device
inputs = {
k: v.to(self.device) for k, v in inputs.items()
}
with self.torch.no_grad():
outputs = self.model(**inputs)
embedding = (
outputs.last_hidden_state.mean(dim=1)
.squeeze()
.cpu()
.numpy()
)
# Cache the embedding
self._cache_embedding(text, embedding)
logger.debug("Embedding generated successfully")
return embedding
except Exception as e:
logger.error(f"Error generating embedding: {str(e)}")
raise
def get_embeddings_batch(self, texts: List[str]) -> np.ndarray:
"""
Generate embeddings for multiple texts in batch for improved efficiency.
This method processes texts in batches, utilizing the cache where possible
and generating new embeddings only for uncached texts.
Args:
texts (List[str]): List of texts to generate embeddings for
Returns:
np.ndarray: Array of embeddings, one for each input text
Raises:
Exception: If batch processing fails
"""
embeddings = []
batch_texts = []
for text in texts:
cached_embedding = self._get_cached_embedding(text)
if cached_embedding is not None:
embeddings.append(cached_embedding)
else:
batch_texts.append(text)
if batch_texts:
if self.config.backend == "openai":
batch_embeddings = self._get_openai_embeddings_batch(
batch_texts
)
for text, embedding in zip(
batch_texts, batch_embeddings
):
self._cache_embedding(text, embedding)
embeddings.append(embedding)
else:
for i in range(
0, len(batch_texts), self.config.batch_size
):
batch = batch_texts[
i : i + self.config.batch_size
]
inputs = self.tokenizer(
batch,
return_tensors="pt",
padding=True,
truncation=True,
max_length=self.config.max_sequence_length,
)
inputs = {
k: v.to(self.device)
for k, v in inputs.items()
}
with self.torch.no_grad():
outputs = self.model(**inputs)
batch_embeddings = (
outputs.last_hidden_state.mean(dim=1)
.cpu()
.numpy()
)
for text, embedding in zip(
batch, batch_embeddings
):
self._cache_embedding(text, embedding)
embeddings.append(embedding)
return np.array(embeddings)
def add_swarm_type(self, swarm_type: SwarmType):
"""
Adds a swarm type to the matcher's registry.
Generates and stores an embedding for the swarm type's description.
Adds a swarm type to the list of swarm types, generating an embedding for its description.
Args:
swarm_type (SwarmType): The swarm type to add
Raises:
Exception: If embedding generation or storage fails
swarm_type (SwarmType): The swarm type to add.
"""
logger.debug(f"Adding swarm type: {swarm_type.name}")
try:
@ -427,104 +135,36 @@ class SwarmMatcher:
def find_best_match(self, task: str) -> Tuple[str, float]:
"""
Finds the best matching swarm type for a given task.
Uses semantic similarity to compare the task against all registered swarm types
and returns the best match along with its confidence score.
Finds the best match for a given task among the registered swarm types.
Args:
task (str): The task description to match
task (str): The task for which to find the best match.
Returns:
Tuple[str, float]: A tuple containing:
- The name of the best matching swarm type
- The similarity score (between 0 and 1)
Raises:
Exception: If matching process fails
Tuple[str, float]: A tuple containing the name of the best matching swarm type and the score.
"""
logger.debug(f"Finding best match for task: {task[:50]}...")
try:
task_embedding = self.get_embedding(task)
best_match = None
best_score = -float("inf")
# Get all swarm type embeddings in batch
swarm_descriptions = [
st.description for st in self.swarm_types
]
swarm_embeddings = self.get_embeddings_batch(
swarm_descriptions
for swarm_type in self.swarm_types:
score = np.dot(
task_embedding, np.array(swarm_type.embedding)
)
# Calculate similarity scores in batch
scores = np.dot(task_embedding, swarm_embeddings.T)
best_idx = np.argmax(scores)
best_score = float(scores[best_idx])
best_match = self.swarm_types[best_idx]
if best_score < self.config.similarity_threshold:
logger.warning(
f"Best match score {best_score} is below threshold {self.config.similarity_threshold}"
)
if score > best_score:
best_score = score
best_match = swarm_type
logger.info(
f"Best match for task: {best_match.name} (score: {best_score})"
)
return best_match.name, best_score
return best_match.name, float(best_score)
except Exception as e:
logger.error(
f"Error finding best match for task: {str(e)}"
)
raise
def find_top_k_matches(
self, task: str, k: int = 3
) -> List[Tuple[str, float]]:
"""
Finds the top k matching swarm types for a given task.
Returns all matches that exceed the similarity threshold, sorted by score.
Args:
task (str): The task for which to find matches.
k (int): Number of top matches to return.
Returns:
List[Tuple[str, float]]: List of tuples containing swarm names and their scores.
"""
logger.debug(
f"Finding top {k} matches for task: {task[:50]}..."
)
try:
task_embedding = self.get_embedding(task)
swarm_descriptions = [
st.description for st in self.swarm_types
]
swarm_embeddings = self.get_embeddings_batch(
swarm_descriptions
)
# Calculate similarity scores in batch
scores = np.dot(task_embedding, swarm_embeddings.T)
top_k_indices = np.argsort(scores)[-k:][::-1]
results = []
for idx in top_k_indices:
score = float(scores[idx])
if score >= self.config.similarity_threshold:
results.append(
(self.swarm_types[idx].name, score)
)
logger.info(
f"Found {len(results)} matches above threshold"
)
return results
except Exception as e:
logger.error(f"Error finding top matches: {str(e)}")
raise
def auto_select_swarm(self, task: str) -> str:
"""
Automatically selects the best swarm type for a given task based on their descriptions.
@ -586,7 +226,8 @@ class SwarmMatcher:
logger.error(f"Error loading swarm types: {str(e)}")
raise
def initialize_swarm_types(self):
def initialize_swarm_types(matcher: SwarmMatcher):
logger.debug("Initializing swarm types")
swarm_types = [
SwarmType(
@ -611,57 +252,30 @@ class SwarmMatcher:
),
SwarmType(
name="HierarchicalSwarm",
description="Organize agents in a hierarchical structure with clear reporting lines and delegation of responsibilities. Keywords: management hierarchy, organizational structure, delegation, supervision, chain of command, tiered organization, structured coordination, leadership roles",
),
SwarmType(
name="AdaptiveSwarm",
description="Dynamically adjust agent behavior and swarm configuration based on task requirements and performance feedback. Keywords: dynamic adaptation, self-optimization, feedback loops, learning systems, flexible configuration, responsive behavior, adaptive algorithms, real-time adjustment",
),
SwarmType(
name="ConsensusSwarm",
description="Achieve group decisions through consensus mechanisms and voting protocols among multiple agents. Keywords: group decision making, voting systems, collective intelligence, agreement protocols, democratic processes, collaborative decisions, consensus building",
),
SwarmType(
name="CouncilAsAJudge",
description="Evaluate and judge solutions or decisions through a council of expert agents acting as arbitrators. Keywords: evaluation, judgment, arbitration, expert assessment, quality control, decision validation, peer review, consensus building",
),
SwarmType(
name="MALT",
description="Multi-Agent Language Tasks framework for coordinating language-based operations across multiple specialized agents. Keywords: language processing, task coordination, linguistic analysis, communication protocols, semantic understanding, natural language tasks",
),
SwarmType(
name="GroupChat",
description="Enable dynamic multi-agent conversations and collaborative problem-solving through structured group discussions. Keywords: collaborative dialogue, group interaction, team communication, collective problem-solving, discussion facilitation, knowledge sharing",
),
SwarmType(
name="MultiAgentRouter",
description="Intelligently route tasks and information between agents based on their specializations and current workload. Keywords: task distribution, load balancing, intelligent routing, agent specialization, workflow optimization, resource allocation",
),
SwarmType(
name="MajorityVoting",
description="Make decisions through democratic voting mechanisms where multiple agents contribute their opinions and votes. Keywords: collective decision-making, democratic process, vote aggregation, opinion pooling, consensus building, collaborative choice",
description="Organize agents in a hierarchical structure with clear reporting lines and delegation of responsibilities. Keywords: management hierarchy, organizational structure, delegation, supervision, chain of command, tiered organization, structured coordination",
),
# SwarmType(
# name="AdaptiveSwarm",
# description="Dynamically adjust agent behavior and swarm configuration based on task requirements and performance feedback. Keywords: dynamic adaptation, self-optimization, feedback loops, learning systems, flexible configuration, responsive behavior, adaptive algorithms",
# ),
# SwarmType(
# name="ConsensusSwarm",
# description="Achieve group decisions through consensus mechanisms and voting protocols among multiple agents. Keywords: group decision making, voting systems, collective intelligence, agreement protocols, democratic processes, collaborative decisions",
# ),
]
try:
for swarm_type in swarm_types:
self.add_swarm_type(swarm_type)
except Exception as e:
logger.error(f"Error initializing swarm types: {str(e)}")
raise
matcher.add_swarm_type(swarm_type)
logger.debug("Swarm types initialized")
def swarm_matcher(task: Union[str, List[str]], *args, **kwargs):
def swarm_matcher(task: str, *args, **kwargs):
"""
Runs the SwarmMatcher example with predefined tasks and swarm types.
"""
if isinstance(task, list):
task = "".join(task)
else:
task = task
config = SwarmMatcherConfig()
matcher = SwarmMatcher(config)
initialize_swarm_types(matcher)
# matcher.save_swarm_types(f"swarm_logs/{uuid4().hex}.json")
@ -672,18 +286,319 @@ def swarm_matcher(task: Union[str, List[str]], *args, **kwargs):
return swarm_type
# # Example usage
# if __name__ == "__main__":
# # Create configuration
# config = SwarmMatcherConfig(
# backend="openai", # Using local embeddings for this example
# similarity_threshold=0.6, # Increase threshold for more strict matching
# cache_embeddings=True,
# from typing import List, Tuple, Dict
# from pydantic import BaseModel, Field
# from loguru import logger
# from uuid import uuid4
# import chromadb
# import json
# from tenacity import retry, stop_after_attempt, wait_exponential
# class SwarmType(BaseModel):
# """A swarm type with its name, description and optional metadata"""
# id: str = Field(default_factory=lambda: str(uuid4()))
# name: str
# description: str
# metadata: Dict = Field(default_factory=dict)
# class SwarmMatcherConfig(BaseModel):
# """Configuration for the SwarmMatcher"""
# collection_name: str = "swarm_types"
# distance_metric: str = "cosine" # or "l2" or "ip"
# embedding_function: str = (
# "sentence-transformers/all-mpnet-base-v2" # Better model than MiniLM
# )
# persist_directory: str = "./chroma_db"
# class SwarmMatcher:
# """
# An improved swarm matcher that uses ChromaDB for better vector similarity search.
# Features:
# - Persistent storage of embeddings
# - Better vector similarity search with multiple distance metrics
# - Improved embedding model
# - Metadata filtering capabilities
# - Batch operations support
# """
# def __init__(self, config: SwarmMatcherConfig):
# """Initialize the improved swarm matcher"""
# logger.add("swarm_matcher.log", rotation="100 MB")
# self.config = config
# # Initialize ChromaDB client with persistence
# self.chroma_client = chromadb.Client()
# # Get or create collection
# try:
# self.collection = self.chroma_client.get_collection(
# name=config.collection_name,
# )
# except ValueError:
# self.collection = self.chroma_client.create_collection(
# name=config.collection_name,
# metadata={"hnsw:space": config.distance_metric},
# )
# # Initialize matcher
# logger.info(
# f"Initialized SwarmMatcher with collection '{config.collection_name}'"
# )
# def add_swarm_type(self, swarm_type: SwarmType) -> None:
# """Add a single swarm type to the collection"""
# try:
# self.collection.add(
# ids=[swarm_type.id],
# documents=[swarm_type.description],
# metadatas=[
# {"name": swarm_type.name, **swarm_type.metadata}
# ],
# )
# logger.info(f"Added swarm type: {swarm_type.name}")
# except Exception as e:
# logger.error(
# f"Error adding swarm type {swarm_type.name}: {str(e)}"
# )
# raise
# def add_swarm_types(self, swarm_types: List[SwarmType]) -> None:
# """Add multiple swarm types in batch"""
# try:
# self.collection.add(
# ids=[st.id for st in swarm_types],
# documents=[st.description for st in swarm_types],
# metadatas=[
# {"name": st.name, **st.metadata}
# for st in swarm_types
# ],
# )
# logger.info(f"Added {len(swarm_types)} swarm types")
# except Exception as e:
# logger.error(
# f"Error adding swarm types in batch: {str(e)}"
# )
# raise
# @retry(
# stop=stop_after_attempt(3),
# wait=wait_exponential(multiplier=1, min=4, max=10),
# )
# def find_best_matches(
# self,
# task: str,
# n_results: int = 3,
# score_threshold: float = 0.7,
# ) -> List[Tuple[str, float]]:
# """
# Find the best matching swarm types for a given task
# Returns multiple matches with their scores
# """
# try:
# results = self.collection.query(
# query_texts=[task],
# n_results=n_results,
# include=["metadatas", "distances"],
# )
# matches = []
# for metadata, distance in zip(
# results["metadatas"][0], results["distances"][0]
# ):
# # Convert distance to similarity score (1 - normalized_distance)
# score = 1 - (
# distance / 2
# ) # Normalize cosine distance to [0,1]
# if score >= score_threshold:
# matches.append((metadata["name"], score))
# logger.info(f"Found {len(matches)} matches for task")
# return matches
# except Exception as e:
# logger.error(f"Error finding matches for task: {str(e)}")
# raise
# def auto_select_swarm(self, task: str) -> str:
# """
# Automatically select the best swarm type for a task
# Returns only the top match
# """
# matches = self.find_best_matches(task, n_results=1)
# if not matches:
# logger.warning("No suitable matches found for task")
# return "SequentialWorkflow" # Default fallback
# best_match, score = matches[0]
# logger.info(
# f"Selected swarm type '{best_match}' with confidence {score:.3f}"
# )
# return best_match
# def run_multiple(self, tasks: List[str]) -> List[str]:
# """Process multiple tasks in batch"""
# return [self.auto_select_swarm(task) for task in tasks]
# def save_swarm_types(self, filename: str) -> None:
# """Export swarm types to JSON"""
# try:
# all_data = self.collection.get(
# include=["metadatas", "documents"]
# )
# swarm_types = [
# SwarmType(
# id=id_,
# name=metadata["name"],
# description=document,
# metadata={
# k: v
# for k, v in metadata.items()
# if k != "name"
# },
# )
# for id_, metadata, document in zip(
# all_data["ids"],
# all_data["metadatas"],
# all_data["documents"],
# )
# ]
# with open(filename, "w") as f:
# json.dump(
# [st.dict() for st in swarm_types], f, indent=2
# )
# logger.info(f"Saved swarm types to {filename}")
# except Exception as e:
# logger.error(f"Error saving swarm types: {str(e)}")
# raise
# def load_swarm_types(self, filename: str) -> None:
# """Import swarm types from JSON"""
# try:
# with open(filename, "r") as f:
# swarm_types_data = json.load(f)
# swarm_types = [SwarmType(**st) for st in swarm_types_data]
# self.add_swarm_types(swarm_types)
# logger.info(f"Loaded swarm types from {filename}")
# except Exception as e:
# logger.error(f"Error loading swarm types: {str(e)}")
# raise
# def initialize_default_swarm_types(matcher: SwarmMatcher) -> None:
# """Initialize the matcher with default swarm types"""
# swarm_types = [
# SwarmType(
# name="AgentRearrange",
# description="""
# Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation
# and minimizing bottlenecks. Specialized in orchestration, coordination, pipeline optimization,
# task scheduling, resource allocation, workflow management, agent organization, and process optimization.
# Best for tasks requiring complex agent interactions and workflow optimization.
# """,
# metadata={
# "category": "optimization",
# "complexity": "high",
# },
# ),
# SwarmType(
# name="MixtureOfAgents",
# description="""
# Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach
# to problem-solving and leveraging individual strengths. Focuses on multi-agent systems,
# expert collaboration, distributed intelligence, collective problem solving, agent specialization,
# team coordination, hybrid approaches, and knowledge synthesis. Ideal for complex problems
# requiring multiple areas of expertise.
# """,
# metadata={
# "category": "collaboration",
# "complexity": "high",
# },
# ),
# SwarmType(
# name="SpreadSheetSwarm",
# description="""
# Collaborative data processing and analysis in a spreadsheet-like environment, facilitating
# real-time data sharing and visualization. Specializes in data analysis, tabular processing,
# collaborative editing, data transformation, spreadsheet operations, data visualization,
# real-time collaboration, and structured data handling. Perfect for data-intensive tasks
# requiring structured analysis.
# """,
# metadata={
# "category": "data_processing",
# "complexity": "medium",
# },
# ),
# SwarmType(
# name="SequentialWorkflow",
# description="""
# Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical
# approach to task execution. Focuses on linear processing, waterfall methodology, step-by-step
# execution, ordered tasks, sequential operations, process flow, systematic approach, and staged
# execution. Best for tasks requiring strict order and dependencies.
# """,
# metadata={"category": "workflow", "complexity": "low"},
# ),
# SwarmType(
# name="ConcurrentWorkflow",
# description="""
# Process multiple tasks or data sources concurrently in parallel, maximizing productivity
# and reducing processing time. Specializes in parallel processing, multi-threading,
# asynchronous execution, distributed computing, concurrent operations, simultaneous tasks,
# parallel workflows, and scalable processing. Ideal for independent tasks that can be
# processed simultaneously.
# """,
# metadata={"category": "workflow", "complexity": "medium"},
# ),
# ]
# matcher.add_swarm_types(swarm_types)
# logger.info("Initialized default swarm types")
# def create_swarm_matcher(
# persist_dir: str = "./chroma_db",
# collection_name: str = "swarm_types",
# ) -> SwarmMatcher:
# """Convenience function to create and initialize a swarm matcher"""
# config = SwarmMatcherConfig(
# persist_directory=persist_dir, collection_name=collection_name
# )
# matcher = SwarmMatcher(config)
# initialize_default_swarm_types(matcher)
# return matcher
# task = "I need to concurrently run 1000 tasks"
# print(matcher.auto_select_swarm(task))
# # Example usage
# def swarm_matcher(task: str) -> str:
# # Create and initialize matcher
# matcher = create_swarm_matcher()
# swarm_type = matcher.auto_select_swarm(task)
# print(f"Task: {task}\nSelected Swarm: {swarm_type}\n")
# return swarm_type
# # # Example usage
# # if __name__ == "__main__":
# # # Create and initialize matcher
# # matcher = create_swarm_matcher()
# # # Example tasks
# # tasks = [
# # "Analyze this spreadsheet of sales data and create visualizations",
# # "Coordinate multiple AI agents to solve a complex problem",
# # "Process these tasks one after another in a specific order",
# # "Write multiple blog posts about the latest advancements in swarm intelligence all at once",
# # "Write a blog post about the latest advancements in swarm intelligence",
# # ]
# # # Process tasks
# # for task in tasks:
# # swarm_type = matcher.auto_select_swarm(task)
# # print(f"Task: {task}\nSelected Swarm: {swarm_type}\n")

@ -5,7 +5,7 @@ import sys
import traceback
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional
import psutil
import requests
@ -95,22 +95,8 @@ class SwarmsIssueReporter:
except:
return "Unknown"
def _get_gpu_info(self) -> Tuple[bool, Optional[str]]:
"""Get GPU information and CUDA availability."""
try:
import torch
cuda_available = torch.cuda.is_available()
if cuda_available:
gpu_info = torch.cuda.get_device_name(0)
return cuda_available, gpu_info
return False, None
except:
return False, None
def _get_system_info(self) -> SwarmSystemInfo:
"""Collect system and Swarms-specific information."""
cuda_available, gpu_info = self._get_gpu_info()
return SwarmSystemInfo(
os_name=platform.system(),
@ -120,8 +106,6 @@ class SwarmsIssueReporter:
memory_usage=psutil.virtual_memory().percent,
disk_usage=psutil.disk_usage("/").percent,
swarms_version=self._get_swarms_version(),
cuda_available=cuda_available,
gpu_info=gpu_info,
)
def _categorize_error(

Loading…
Cancel
Save