parent
8b78ab2bba
commit
dded54393c
@ -1,49 +0,0 @@
|
||||
import os
|
||||
from swarms import Agent
|
||||
from swarm_models import OpenAIChat
|
||||
from swarms.prompts.prompt_generator_optimizer import (
|
||||
prompt_generator_sys_prompt,
|
||||
)
|
||||
from dotenv import load_dotenv
|
||||
from swarms.agents.prompt_generator_agent import PromptGeneratorAgent
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Get the OpenAI API key from the environment variable
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
|
||||
# Create an instance of the OpenAIChat class
|
||||
model = OpenAIChat(
|
||||
openai_api_key=api_key,
|
||||
model_name="gpt-4o-mini",
|
||||
temperature=0.1,
|
||||
max_tokens=2000,
|
||||
)
|
||||
|
||||
# Initialize the agent
|
||||
agent = Agent(
|
||||
agent_name="Prompt-Optimizer",
|
||||
system_prompt=prompt_generator_sys_prompt.get_prompt(),
|
||||
llm=model,
|
||||
max_loops=1,
|
||||
autosave=True,
|
||||
dashboard=False,
|
||||
verbose=True,
|
||||
dynamic_temperature_enabled=True,
|
||||
saved_state_path="optimizer_agent.json",
|
||||
user_name="swarms_corp",
|
||||
retry_attempts=1,
|
||||
context_length=200000,
|
||||
return_step_meta=False,
|
||||
# output_type="json",
|
||||
output_type="string",
|
||||
)
|
||||
|
||||
|
||||
# Main Class
|
||||
prompt_generator = PromptGeneratorAgent(agent)
|
||||
|
||||
# Run the agent
|
||||
prompt_generator.run(
|
||||
"Generate an amazing prompt for analyzing healthcare insurance documents"
|
||||
)
|
@ -1,271 +0,0 @@
|
||||
from typing import List, Tuple, Optional
|
||||
import numpy as np
|
||||
import torch
|
||||
from transformers import AutoTokenizer, AutoModel
|
||||
from pydantic import BaseModel, Field
|
||||
from loguru import logger
|
||||
import json
|
||||
from tenacity import retry, stop_after_attempt, wait_exponential
|
||||
|
||||
# Ensure you have the necessary libraries installed:
|
||||
# pip install torch transformers pydantic loguru tenacity
|
||||
|
||||
|
||||
class SwarmType(BaseModel):
|
||||
name: str
|
||||
description: str
|
||||
embedding: Optional[List[float]] = Field(
|
||||
default=None, exclude=True
|
||||
)
|
||||
|
||||
|
||||
class SwarmMatcherConfig(BaseModel):
|
||||
model_name: str = "sentence-transformers/all-MiniLM-L6-v2"
|
||||
embedding_dim: int = (
|
||||
512 # Dimension of the sentence-transformers model
|
||||
)
|
||||
|
||||
|
||||
class SwarmMatcher:
|
||||
"""
|
||||
A class for matching tasks to swarm types based on their descriptions.
|
||||
It utilizes a transformer model to generate embeddings for task and swarm type descriptions,
|
||||
and then calculates the dot product to find the best match.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initializes the SwarmMatcher with a configuration.
|
||||
|
||||
Args:
|
||||
config (SwarmMatcherConfig): The configuration for the SwarmMatcher.
|
||||
"""
|
||||
logger.add("swarm_matcher_debug.log", level="DEBUG")
|
||||
logger.debug("Initializing SwarmMatcher")
|
||||
try:
|
||||
config = SwarmMatcherConfig()
|
||||
self.config = config
|
||||
self.tokenizer = AutoTokenizer.from_pretrained(
|
||||
config.model_name
|
||||
)
|
||||
self.model = AutoModel.from_pretrained(config.model_name)
|
||||
self.swarm_types: List[SwarmType] = []
|
||||
logger.debug("SwarmMatcher initialized successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Error initializing SwarmMatcher: {str(e)}")
|
||||
raise
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential(multiplier=1, min=4, max=10),
|
||||
)
|
||||
def get_embedding(self, text: str) -> np.ndarray:
|
||||
"""
|
||||
Generates an embedding for a given text using the configured model.
|
||||
|
||||
Args:
|
||||
text (str): The text for which to generate an embedding.
|
||||
|
||||
Returns:
|
||||
np.ndarray: The embedding vector for the text.
|
||||
"""
|
||||
logger.debug(f"Getting embedding for text: {text[:50]}...")
|
||||
try:
|
||||
inputs = self.tokenizer(
|
||||
text,
|
||||
return_tensors="pt",
|
||||
padding=True,
|
||||
truncation=True,
|
||||
max_length=512,
|
||||
)
|
||||
with torch.no_grad():
|
||||
outputs = self.model(**inputs)
|
||||
embedding = (
|
||||
outputs.last_hidden_state.mean(dim=1)
|
||||
.squeeze()
|
||||
.numpy()
|
||||
)
|
||||
logger.debug("Embedding generated successfully")
|
||||
return embedding
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating embedding: {str(e)}")
|
||||
raise
|
||||
|
||||
def add_swarm_type(self, swarm_type: SwarmType):
|
||||
"""
|
||||
Adds a swarm type to the list of swarm types, generating an embedding for its description.
|
||||
|
||||
Args:
|
||||
swarm_type (SwarmType): The swarm type to add.
|
||||
"""
|
||||
logger.debug(f"Adding swarm type: {swarm_type.name}")
|
||||
try:
|
||||
embedding = self.get_embedding(swarm_type.description)
|
||||
swarm_type.embedding = embedding.tolist()
|
||||
self.swarm_types.append(swarm_type)
|
||||
logger.info(f"Added swarm type: {swarm_type.name}")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error adding swarm type {swarm_type.name}: {str(e)}"
|
||||
)
|
||||
raise
|
||||
|
||||
def find_best_match(self, task: str) -> Tuple[str, float]:
|
||||
"""
|
||||
Finds the best match for a given task among the registered swarm types.
|
||||
|
||||
Args:
|
||||
task (str): The task for which to find the best match.
|
||||
|
||||
Returns:
|
||||
Tuple[str, float]: A tuple containing the name of the best matching swarm type and the score.
|
||||
"""
|
||||
logger.debug(f"Finding best match for task: {task[:50]}...")
|
||||
try:
|
||||
task_embedding = self.get_embedding(task)
|
||||
best_match = None
|
||||
best_score = -float("inf")
|
||||
for swarm_type in self.swarm_types:
|
||||
score = np.dot(
|
||||
task_embedding, np.array(swarm_type.embedding)
|
||||
)
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best_match = swarm_type
|
||||
logger.info(
|
||||
f"Best match for task: {best_match.name} (score: {best_score})"
|
||||
)
|
||||
return best_match.name, float(best_score)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error finding best match for task: {str(e)}"
|
||||
)
|
||||
raise
|
||||
|
||||
def auto_select_swarm(self, task: str) -> str:
|
||||
"""
|
||||
Automatically selects the best swarm type for a given task based on their descriptions.
|
||||
|
||||
Args:
|
||||
task (str): The task for which to select a swarm type.
|
||||
|
||||
Returns:
|
||||
str: The name of the selected swarm type.
|
||||
"""
|
||||
logger.debug(f"Auto-selecting swarm for task: {task[:50]}...")
|
||||
best_match, score = self.find_best_match(task)
|
||||
|
||||
if (
|
||||
best_match == "No match"
|
||||
): # Handle the case where no match was found
|
||||
logger.info(f"Task: {task}")
|
||||
logger.info("No suitable swarm type found.")
|
||||
return "No suitable swarm type found" # Return a message indicating no match
|
||||
|
||||
logger.info(f"Task: {task}")
|
||||
logger.info(f"Selected Swarm Type: {best_match}")
|
||||
logger.info(f"Confidence Score: {score:.2f}")
|
||||
return best_match
|
||||
|
||||
def run_multiple(self, tasks: List[str], *args, **kwargs) -> str:
|
||||
swarms = []
|
||||
|
||||
for task in tasks:
|
||||
output = self.auto_select_swarm(task)
|
||||
|
||||
# Append
|
||||
swarms.append(output)
|
||||
|
||||
return swarms
|
||||
|
||||
def save_swarm_types(self, filename: str):
|
||||
"""
|
||||
Saves the registered swarm types to a JSON file.
|
||||
|
||||
Args:
|
||||
filename (str): The name of the file to which to save the swarm types.
|
||||
"""
|
||||
try:
|
||||
with open(filename, "w") as f:
|
||||
json.dump([st.dict() for st in self.swarm_types], f)
|
||||
logger.info(f"Saved swarm types to {filename}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving swarm types: {str(e)}")
|
||||
raise
|
||||
|
||||
def load_swarm_types(self, filename: str):
|
||||
"""
|
||||
Loads swarm types from a JSON file.
|
||||
|
||||
Args:
|
||||
filename (str): The name of the file from which to load the swarm types.
|
||||
"""
|
||||
try:
|
||||
with open(filename, "r") as f:
|
||||
swarm_types_data = json.load(f)
|
||||
self.swarm_types = [
|
||||
SwarmType(**st) for st in swarm_types_data
|
||||
]
|
||||
logger.info(f"Loaded swarm types from {filename}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading swarm types: {str(e)}")
|
||||
raise
|
||||
|
||||
def initialize_swarm_types(self):
|
||||
logger.debug("Initializing swarm types")
|
||||
swarm_types = [
|
||||
SwarmType(
|
||||
name="AgentRearrange",
|
||||
description="Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation and minimizing bottlenecks",
|
||||
),
|
||||
SwarmType(
|
||||
name="MixtureOfAgents",
|
||||
description="Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach to problem-solving and leveraging individual strengths",
|
||||
),
|
||||
SwarmType(
|
||||
name="SpreadSheetSwarm",
|
||||
description="Collaborative data processing and analysis in a spreadsheet-like environment, facilitating real-time data sharing and visualization",
|
||||
),
|
||||
SwarmType(
|
||||
name="SequentialWorkflow",
|
||||
description="Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical approach to task execution",
|
||||
),
|
||||
SwarmType(
|
||||
name="ConcurrentWorkflow",
|
||||
description="Process multiple tasks or data sources concurrently in parallel, maximizing productivity and reducing processing time",
|
||||
),
|
||||
]
|
||||
|
||||
for swarm_type in swarm_types:
|
||||
self.add_swarm_type(swarm_type)
|
||||
|
||||
# logger.debug("Swarm types initialized")
|
||||
|
||||
|
||||
# Example usage
|
||||
if __name__ == "__main__":
|
||||
# logger.info("Starting SwarmMatcher example")
|
||||
matcher = SwarmMatcher()
|
||||
|
||||
# Save swarm types for future use
|
||||
# matcher.save_swarm_types("swarm_types.json")
|
||||
|
||||
tasks = [
|
||||
# "Optimize the order of agents for a complex financial analysis task",
|
||||
# "Combine insights from various expert agents to evaluate a potential acquisition",
|
||||
# "Process and analyze customer churn data collaboratively",
|
||||
# "Generate a comprehensive due diligence report step by step",
|
||||
"Analyze multiple data sources concurrently for market research"
|
||||
]
|
||||
|
||||
for task in tasks:
|
||||
selected_swarm = matcher.auto_select_swarm(task)
|
||||
# print("\n" + "-"*50 + "\n")
|
||||
print(selected_swarm)
|
||||
|
||||
# # Load swarm types in a new session
|
||||
# new_matcher = SwarmMatcher(config)
|
||||
# new_matcher.load_swarm_types("swarm_types.json")
|
||||
# print("Loaded swarm types:", [st.name for st in new_matcher.swarm_types])
|
||||
|
||||
# logger.info("SwarmMatcher example completed successfully")
|
Loading…
Reference in new issue