diff --git a/deep_research_swarm_example.py b/deep_research_swarm_example.py
new file mode 100644
index 00000000..f20358ff
--- /dev/null
+++ b/deep_research_swarm_example.py
@@ -0,0 +1,13 @@
+from swarms.structs.deep_research_swarm import DeepResearchSwarm
+
+
+swarm = DeepResearchSwarm(
+ name="Deep Research Swarm",
+ description="A swarm of agents that can perform deep research on a given topic",
+ output_type="all", # Change to string output type for better readability
+)
+
+out = swarm.run(
+ "What are the latest developments and news in the AI and cryptocurrency space?"
+)
+print(out)
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index d4e9725c..9a4a8823 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -358,25 +358,25 @@ nav:
- Finance Swarm: "swarms/examples/swarms_api_finance.md"
- ML Model Code Generation Swarm: "swarms/examples/swarms_api_ml_model.md"
- - Swarm Models:
- - Overview: "swarms/models/index.md"
- # - Models Available: "swarms/models/index.md"
- # - Available Models from OpenAI, Huggingface, TogetherAI, and more: "swarms/models/models_available_overview.md"
- # - Model Router
- - Quickstart: "swarms/models/models_available_overview.md"
- - How to Create A Custom Language Model: "swarms/models/custom_model.md"
- - Language Models:
- - BaseLLM: "swarms/models/base_llm.md"
- - HuggingFaceLLM: "swarms/models/huggingface.md"
- - Anthropic: "swarms/models/anthropic.md"
- - OpenAIChat: "swarms/models/openai.md"
- - OpenAIFunctionCaller: "swarms/models/openai_function_caller.md"
- - Groq: "swarms/models/groq.md"
- - Cerebras: "swarms/models/cerebras.md"
- - MultiModal Models:
- - BaseMultiModalModel: "swarms/models/base_multimodal_model.md"
- - Multi Modal Models Available: "swarms/models/multimodal_models.md"
- - GPT4VisionAPI: "swarms/models/gpt4v.md"
+ # - Swarm Models:
+ # - Overview: "swarms/models/index.md"
+ # # - Models Available: "swarms/models/index.md"
+ # # - Available Models from OpenAI, Huggingface, TogetherAI, and more: "swarms/models/models_available_overview.md"
+ # # - Model Router
+ # - Quickstart: "swarms/models/models_available_overview.md"
+ # - How to Create A Custom Language Model: "swarms/models/custom_model.md"
+ # - Language Models:
+ # - BaseLLM: "swarms/models/base_llm.md"
+ # - HuggingFaceLLM: "swarms/models/huggingface.md"
+ # - Anthropic: "swarms/models/anthropic.md"
+ # - OpenAIChat: "swarms/models/openai.md"
+ # - OpenAIFunctionCaller: "swarms/models/openai_function_caller.md"
+ # - Groq: "swarms/models/groq.md"
+ # - Cerebras: "swarms/models/cerebras.md"
+ # - MultiModal Models:
+ # - BaseMultiModalModel: "swarms/models/base_multimodal_model.md"
+ # - Multi Modal Models Available: "swarms/models/multimodal_models.md"
+ # - GPT4VisionAPI: "swarms/models/gpt4v.md"
- Swarms Cloud API:
- Overview: "swarms_cloud/swarms_api.md"
- Swarms API as MCP: "swarms_cloud/mcp.md"
diff --git a/docs/swarms/structs/swarm_router.md b/docs/swarms/structs/swarm_router.md
index cf0a87e9..03de7071 100644
--- a/docs/swarms/structs/swarm_router.md
+++ b/docs/swarms/structs/swarm_router.md
@@ -17,6 +17,7 @@ The `SwarmRouter` class is a flexible routing system designed to manage differen
| `MALT` | Multi-Agent Language Tasks |
| `DeepResearchSwarm` | Specialized for deep research tasks |
| `CouncilAsAJudge` | Council-based judgment system |
+| `InteractiveGroupChat` | Interactive group chat with user participation |
| `auto` | Automatically selects best swarm type via embedding search |
## Classes
@@ -466,6 +467,25 @@ auto_router = SwarmRouter(
result = auto_router.run("Conduct a comprehensive market analysis for Product X")
```
+### InteractiveGroupChat
+
+Use Case: Interactive group discussions with user participation.
+
+```python
+interactive_chat_router = SwarmRouter(
+ name="InteractiveGroupChat",
+ description="Interactive group chat with user participation",
+ max_loops=10,
+ agents=[financial_analyst, market_researcher, competitor_analyst],
+ swarm_type="InteractiveGroupChat",
+ output_type="string"
+)
+
+result = interactive_chat_router.run("Discuss the market trends and provide interactive analysis")
+```
+
+The InteractiveGroupChat allows for dynamic interaction between agents and users, enabling real-time participation in group discussions and decision-making processes. This is particularly useful for scenarios requiring human input or validation during the conversation flow.
+
## Advanced Features
### Processing Documents
diff --git a/exa_search_test.py b/exa_search_test.py
new file mode 100644
index 00000000..7eba6e9e
--- /dev/null
+++ b/exa_search_test.py
@@ -0,0 +1,4 @@
+from swarms.structs.deep_research_swarm import exa_search
+
+
+print(exa_search("What are the best multi-agent frameworks "))
diff --git a/swarm_matcher_example.py b/swarm_matcher_example.py
new file mode 100644
index 00000000..13e8446a
--- /dev/null
+++ b/swarm_matcher_example.py
@@ -0,0 +1,23 @@
+from swarms.structs.swarm_matcher import (
+ SwarmMatcher,
+ SwarmMatcherConfig,
+)
+from dotenv import load_dotenv
+
+load_dotenv()
+
+# Example usage
+if __name__ == "__main__":
+ # Create configuration
+ config = SwarmMatcherConfig(
+ backend="openai", # Using local embeddings for this example
+ similarity_threshold=0.6, # Increase threshold for more strict matching
+ cache_embeddings=True,
+ )
+
+ # Initialize matcher
+ matcher = SwarmMatcher(config)
+
+ task = "I need to build a hierarchical swarm of agents to solve a problem"
+
+ print(matcher.auto_select_swarm(task))
diff --git a/swarms/tools/tool_schema_base_model.py b/swarms/schemas/tool_schema_base_model.py
similarity index 100%
rename from swarms/tools/tool_schema_base_model.py
rename to swarms/schemas/tool_schema_base_model.py
diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py
index eae93084..dce3c2c2 100644
--- a/swarms/structs/agent.py
+++ b/swarms/structs/agent.py
@@ -60,6 +60,7 @@ from swarms.utils.any_to_str import any_to_str
from swarms.utils.data_to_text import data_to_text
from swarms.utils.file_processing import create_file_in_folder
from swarms.utils.formatter import formatter
+from swarms.utils.generate_keys import generate_api_key
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
@@ -444,7 +445,7 @@ class Agent:
self.system_prompt = system_prompt
self.agent_name = agent_name
self.agent_description = agent_description
- self.saved_state_path = f"{self.agent_name}_state.json"
+ self.saved_state_path = f"{self.agent_name}_{generate_api_key(prefix='agent-')}_state.json"
self.autosave = autosave
self.response_filters = []
self.self_healing_enabled = self_healing_enabled
@@ -645,11 +646,6 @@ class Agent:
if self.conversation_schema
else False
),
- autosave=(
- self.conversation_schema.autosave
- if self.conversation_schema
- else False
- ),
time_enabled=(
self.conversation_schema.time_enabled
if self.conversation_schema
@@ -1081,6 +1077,18 @@ class Agent:
response, loop_count
)
+ if exists(self.mcp_url) and exists(
+ self.tools
+ ):
+ self.mcp_tool_handling(
+ response, loop_count
+ )
+
+ self.execute_tools(
+ response=response,
+ loop_count=loop_count,
+ )
+
self.sentiment_and_evaluator(response)
success = True # Mark as successful to exit the retry loop
@@ -2382,7 +2390,7 @@ class Agent:
return None
def call_llm(
- self, task: str, img: str = None, *args, **kwargs
+ self, task: str, img: Optional[str] = None, *args, **kwargs
) -> str:
"""
Calls the appropriate method on the `llm` object based on the given task.
@@ -2404,7 +2412,12 @@ class Agent:
"""
try:
- out = self.llm.run(task=task, img=img, *args, **kwargs)
+ if img is not None:
+ out = self.llm.run(
+ task=task, img=img, *args, **kwargs
+ )
+ else:
+ out = self.llm.run(task=task, *args, **kwargs)
return out
except AgentLLMError as e:
diff --git a/swarms/structs/base_swarm.py b/swarms/structs/base_swarm.py
index c7f6b1f9..3d7b6e08 100644
--- a/swarms/structs/base_swarm.py
+++ b/swarms/structs/base_swarm.py
@@ -12,6 +12,7 @@ from typing import (
List,
Optional,
Sequence,
+ Union,
)
import yaml
@@ -76,7 +77,7 @@ class BaseSwarm(ABC):
self,
name: Optional[str] = None,
description: Optional[str] = None,
- agents: Optional[List[Agent]] = None,
+ agents: Optional[List[Union[Agent, Callable]]] = None,
models: Optional[List[Any]] = None,
max_loops: Optional[int] = 200,
callbacks: Optional[Sequence[callable]] = None,
diff --git a/swarms/structs/conversation.py b/swarms/structs/conversation.py
index 88c80a75..aae1acaa 100644
--- a/swarms/structs/conversation.py
+++ b/swarms/structs/conversation.py
@@ -1211,6 +1211,13 @@ class Conversation(BaseStructure):
save_enabled=True,
)
+ def return_dict_final(self):
+ """Return the final message as a dictionary."""
+ return (
+ self.conversation_history[-1]["content"],
+ self.conversation_history[-1]["content"],
+ )
+
@classmethod
def list_conversations(
cls, conversations_dir: Optional[str] = None
diff --git a/swarms/structs/deep_research_swarm.py b/swarms/structs/deep_research_swarm.py
index c2ae096b..188ac7ea 100644
--- a/swarms/structs/deep_research_swarm.py
+++ b/swarms/structs/deep_research_swarm.py
@@ -1,22 +1,14 @@
-import asyncio
import concurrent.futures
import json
import os
-from concurrent.futures import ThreadPoolExecutor, as_completed
-from datetime import datetime
-from pathlib import Path
-from typing import Any, Dict, List
+from typing import Any, List
-import aiohttp
from dotenv import load_dotenv
from rich.console import Console
-from rich.panel import Panel
-from rich.tree import Tree
+import requests
-from swarms.agents.reasoning_duo import ReasoningDuo
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
-from swarms.utils.any_to_str import any_to_str
from swarms.utils.formatter import formatter
from swarms.utils.history_output_formatter import (
history_output_formatter,
@@ -36,103 +28,19 @@ MAX_WORKERS = (
###############################################################################
-def format_exa_results(json_data: Dict[str, Any]) -> str:
- """Formats Exa.ai search results into structured text"""
- if "error" in json_data:
- return f"### Error\n{json_data['error']}\n"
-
- # Pre-allocate formatted_text list with initial capacity
- formatted_text = []
-
- # Extract search metadata
- search_params = json_data.get("effectiveFilters", {})
- query = search_params.get("query", "General web search")
- formatted_text.append(
- f"### Exa Search Results for: '{query}'\n\n---\n"
- )
-
- # Process results
- results = json_data.get("results", [])
-
- if not results:
- formatted_text.append("No results found.\n")
- return "".join(formatted_text)
-
- def process_result(
- result: Dict[str, Any], index: int
- ) -> List[str]:
- """Process a single result in a thread-safe manner"""
- title = result.get("title", "No title")
- url = result.get("url", result.get("id", "No URL"))
- published_date = result.get("publishedDate", "")
-
- # Handle highlights efficiently
- highlights = result.get("highlights", [])
- highlight_text = (
- "\n".join(
- (
- h.get("text", str(h))
- if isinstance(h, dict)
- else str(h)
- )
- for h in highlights[:3]
- )
- if highlights
- else "No summary available"
- )
-
- return [
- f"{index}. **{title}**\n",
- f" - URL: {url}\n",
- f" - Published: {published_date.split('T')[0] if published_date else 'Date unknown'}\n",
- f" - Key Points:\n {highlight_text}\n\n",
- ]
-
- # Process results concurrently
- with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
- future_to_result = {
- executor.submit(process_result, result, i + 1): i
- for i, result in enumerate(results)
- }
-
- # Collect results in order
- processed_results = [None] * len(results)
- for future in as_completed(future_to_result):
- idx = future_to_result[future]
- try:
- processed_results[idx] = future.result()
- except Exception as e:
- console.print(
- f"[bold red]Error processing result {idx + 1}: {str(e)}[/bold red]"
- )
- processed_results[idx] = [
- f"Error processing result {idx + 1}: {str(e)}\n"
- ]
-
- # Extend formatted text with processed results in correct order
- for result_text in processed_results:
- formatted_text.extend(result_text)
-
- return "".join(formatted_text)
-
-
-async def _async_exa_search(
- query: str, **kwargs: Any
-) -> Dict[str, Any]:
- """Asynchronous helper function for Exa.ai API requests"""
+def exa_search(query: str, **kwargs: Any) -> str:
+ """Performs web search using Exa.ai API and returns formatted results."""
api_url = "https://api.exa.ai/search"
-
- # Check if API key is available
api_key = os.getenv("EXA_API_KEY")
+
if not api_key:
- return {"error": "EXA_API_KEY environment variable not set"}
+ return "### Error\nEXA_API_KEY environment variable not set\n"
headers = {
"x-api-key": api_key,
"Content-Type": "application/json",
}
- # Filter out None keys AND None values from kwargs
safe_kwargs = {
str(k): v
for k, v in kwargs.items()
@@ -145,11 +53,10 @@ async def _async_exa_search(
"numResults": safe_kwargs.get("num_results", 10),
"contents": {
"text": True,
- "highlights": {"numSentences": 2},
+ "highlights": {"numSentences": 10},
},
}
- # Only add safe_kwargs if they don't conflict with existing keys
for key, value in safe_kwargs.items():
if key not in payload and key not in [
"query",
@@ -160,41 +67,58 @@ async def _async_exa_search(
payload[key] = value
try:
- async with aiohttp.ClientSession() as session:
- async with session.post(
- api_url, json=payload, headers=headers
- ) as response:
- if response.status != 200:
- return {
- "error": f"HTTP {response.status}: {await response.text()}"
- }
- return await response.json()
+ response = requests.post(
+ api_url, json=payload, headers=headers
+ )
+ if response.status_code != 200:
+ return f"### Error\nHTTP {response.status_code}: {response.text}\n"
+ json_data = response.json()
except Exception as e:
- return {"error": str(e)}
+ return f"### Error\n{str(e)}\n"
+ if "error" in json_data:
+ return f"### Error\n{json_data['error']}\n"
-def exa_search(query: str, **kwargs: Any) -> str:
- """Performs web search using Exa.ai API with concurrent processing"""
- try:
- # Run async search in the event loop
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- try:
- response_json = loop.run_until_complete(
- _async_exa_search(query, **kwargs)
- )
- finally:
- loop.close()
+ formatted_text = []
+ search_params = json_data.get("effectiveFilters", {})
+ query = search_params.get("query", "General web search")
+ formatted_text.append(
+ f"### Exa Search Results for: '{query}'\n\n---\n"
+ )
- # Format results concurrently
- formatted_text = format_exa_results(response_json)
+ results = json_data.get("results", [])
+ if not results:
+ formatted_text.append("No results found.\n")
+ return "".join(formatted_text)
- return formatted_text
+ for i, result in enumerate(results, 1):
+ title = result.get("title", "No title")
+ url = result.get("url", result.get("id", "No URL"))
+ published_date = result.get("publishedDate", "")
+ highlights = result.get("highlights", [])
+ highlight_text = (
+ "\n".join(
+ (
+ h.get("text", str(h))
+ if isinstance(h, dict)
+ else str(h)
+ )
+ for h in highlights[:3]
+ )
+ if highlights
+ else "No summary available"
+ )
- except Exception as e:
- error_msg = f"Unexpected error: {str(e)}"
- console.print(f"[bold red]{error_msg}[/bold red]")
- return error_msg
+ formatted_text.extend(
+ [
+ f"{i}. **{title}**\n",
+ f" - URL: {url}\n",
+ f" - Published: {published_date.split('T')[0] if published_date else 'Date unknown'}\n",
+ f" - Key Points:\n {highlight_text}\n\n",
+ ]
+ )
+
+ return "".join(formatted_text)
# Define the research tools schema
@@ -310,6 +234,7 @@ class DeepResearchSwarm:
* 2, # Let the system decide optimal thread count
token_count: bool = False,
research_model_name: str = "gpt-4o-mini",
+ claude_summarization_model_name: str = "claude-3-5-sonnet-20240620",
):
self.name = name
self.description = description
@@ -318,6 +243,9 @@ class DeepResearchSwarm:
self.output_type = output_type
self.max_workers = max_workers
self.research_model_name = research_model_name
+ self.claude_summarization_model_name = (
+ claude_summarization_model_name
+ )
self.reliability_check()
self.conversation = Conversation(token_count=token_count)
@@ -335,12 +263,17 @@ class DeepResearchSwarm:
system_prompt=RESEARCH_AGENT_PROMPT,
max_loops=1, # Allow multiple iterations for thorough research
tools_list_dictionary=tools,
- model_name="gpt-4o-mini",
+ model_name=self.research_model_name,
+ output_type="final",
)
- self.reasoning_duo = ReasoningDuo(
+ self.summarization_agent = Agent(
+ agent_name="Summarization-Agent",
+ agent_description="Specialized agent for summarizing research results",
system_prompt=SUMMARIZATION_AGENT_PROMPT,
- output_type="string",
+ max_loops=1,
+ model_name=self.claude_summarization_model_name,
+ output_type="final",
)
def __del__(self):
@@ -372,45 +305,49 @@ class DeepResearchSwarm:
# Get the agent's response
agent_output = self.research_agent.run(query)
- self.conversation.add(
- role=self.research_agent.agent_name, content=agent_output
- )
+ # Transform the string into a list of dictionaries
+ agent_output = json.loads(agent_output)
+ print(agent_output)
+ print(type(agent_output))
- # Convert the string output to dictionary
- output_dict = str_to_dict(agent_output)
+ formatter.print_panel(
+ f"Agent output type: {type(agent_output)} \n {agent_output}",
+ "blue",
+ )
- # Print the conversation history
- if self.nice_print:
- to_do_list = any_to_str(output_dict)
- formatter.print_panel(to_do_list, "blue")
+ # Convert the output to a dictionary if it's a list
+ if isinstance(agent_output, list):
+ agent_output = json.dumps(agent_output)
- # Extract the detailed queries from the output
- if (
- isinstance(output_dict, dict)
- and "detailed_queries" in output_dict
- ):
- queries = output_dict["detailed_queries"]
- formatter.print_panel(
- f"Generated {len(queries)} queries", "blue"
+ if isinstance(agent_output, str):
+ # Convert the string output to dictionary
+ output_dict = (
+ str_to_dict(agent_output)
+ if isinstance(agent_output, str)
+ else agent_output
)
- return queries
- return []
+ # Extract the detailed queries from the output
+ # Search for the key "detailed_queries" in the output list[dictionary]
+ if isinstance(output_dict, list):
+ for item in output_dict:
+ if "detailed_queries" in item:
+ queries = item["detailed_queries"]
+ break
+ else:
+ queries = output_dict.get("detailed_queries", [])
+
+ print(queries)
+
+ # Log the number of queries generated
+ formatter.print_panel(
+ f"Generated {len(queries)} queries", "blue"
+ )
- def _process_query(self, query: str) -> str:
- """
- Process a single query with search only.
- This function is designed to be run in a separate thread.
+ print(queries)
+ print(type(queries))
- Args:
- query (str): The query to process
-
- Returns:
- str: Search results
- """
- # Run the search only - no individual reasoning to avoid duplication
- results = exa_search(query)
- return results
+ return queries
def step(self, query: str):
"""
@@ -426,21 +363,12 @@ class DeepResearchSwarm:
# Get all the queries to process
queries = self.get_queries(query)
- if not queries:
- error_msg = (
- "No queries generated. Please check your input."
- )
- self.conversation.add(
- role="System", content=error_msg
- )
- return history_output_formatter(
- self.conversation, type=self.output_type
- )
+ print(queries)
# Submit all queries for concurrent processing
futures = []
for q in queries:
- future = self.executor.submit(self._process_query, q)
+ future = self.executor.submit(exa_search, q)
futures.append((q, future))
# Process results as they complete
@@ -468,12 +396,12 @@ class DeepResearchSwarm:
# Generate final comprehensive analysis after all searches are complete
try:
- final_summary = self.reasoning_duo.run(
- f"Generate an extensive report of the following content: {self.conversation.get_str()}"
+ final_summary = self.summarization_agent.run(
+ f"Please generate a comprehensive 4,000-word report analyzing the following content: {self.conversation.get_str()}"
)
self.conversation.add(
- role=self.reasoning_duo.agent_name,
+ role=self.summarization_agent.agent_name,
content=final_summary,
)
except Exception as e:
@@ -535,144 +463,6 @@ class DeepResearchSwarm:
future = self.executor.submit(self.step, task)
futures.append((task, future))
- def parse_and_display_results(
- self, json_result: str, export_markdown: bool = True
- ):
- """
- Parse JSON results and display in rich format with optional markdown export.
-
- Args:
- json_result (str): JSON string containing conversation results
- export_markdown (bool): Whether to export to markdown file
- """
- try:
- # Parse JSON
- data = json.loads(json_result)
-
- # Create rich display
- console.print("\n" + "=" * 100, style="cyan")
- console.print(
- "š¬ DEEP RESEARCH RESULTS",
- style="bold cyan",
- justify="center",
- )
- console.print("=" * 100, style="cyan")
-
- # Create conversation tree
- tree = Tree("š£ļø Research Conversation", style="bold blue")
- markdown_content = [
- "# Deep Research Results\n",
- f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n",
- ]
-
- for i, entry in enumerate(data, 1):
- if isinstance(entry, dict):
- role = entry.get("role", "Unknown")
- content = entry.get("content", "")
- timestamp = entry.get("timestamp", "")
-
- # Get role info for display
- role_info = self._get_role_display_info(role)
-
- # Create tree branch
- branch_text = f"{role_info['emoji']} {role}"
- if timestamp:
- time_part = (
- timestamp.split()[-1]
- if " " in timestamp
- else timestamp[-8:]
- )
- branch_text += f" ({time_part})"
-
- branch = tree.add(
- branch_text, style=role_info["style"]
- )
-
- # Add content preview to tree
- content_preview = (
- content[:150] + "..."
- if len(content) > 150
- else content
- )
- content_preview = content_preview.replace(
- "\n", " "
- )
- branch.add(content_preview, style="dim")
-
- # Add to markdown
- markdown_content.append(f"\n## {i}. {role}")
- if timestamp:
- markdown_content.append(
- f"**Timestamp:** {timestamp}"
- )
- markdown_content.append(f"\n{content}\n")
-
- # Display full content for important entries
- if (
- role.lower() in ["reasoning-agent-01"]
- and len(content) > 300
- ):
- console.print(
- f"\nš {role} Full Response:",
- style="bold green",
- )
- console.print(
- Panel(
- content,
- border_style="green",
- title=f"{role} Analysis",
- )
- )
-
- # Display the tree
- console.print(tree)
-
- # Export to markdown if requested
- if export_markdown:
- # Create deepsearch_results directory
- results_dir = Path("deepsearch_results")
- results_dir.mkdir(exist_ok=True)
-
- # Generate filename with timestamp
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = (
- results_dir / f"research_results_{timestamp}.md"
- )
-
- # Write markdown file
- with open(filename, "w", encoding="utf-8") as f:
- f.write("\n".join(markdown_content))
-
- console.print(
- f"\nš¾ Results exported to: {filename}",
- style="bold green",
- )
-
- console.print(
- "\nā
Research analysis complete!", style="bold cyan"
- )
-
- except json.JSONDecodeError as e:
- console.print(f"ā Error parsing JSON: {e}", style="red")
- except Exception as e:
- console.print(
- f"ā Error displaying results: {e}", style="red"
- )
-
- def _get_role_display_info(self, role: str) -> Dict[str, str]:
- """Get display information for different conversation roles."""
- role_map = {
- "user": {"emoji": "š¤", "style": "cyan"},
- "deep-research-agent": {"emoji": "š", "style": "blue"},
- "reasoning-agent-01": {"emoji": "š§ ", "style": "magenta"},
- "system": {"emoji": "āļø", "style": "yellow"},
- }
-
- role_lower = role.lower()
- return role_map.get(
- role_lower, {"emoji": "š¤", "style": "white"}
- )
-
# Example usage
# if __name__ == "__main__":
diff --git a/swarms/structs/multi_agent_router.py b/swarms/structs/multi_agent_router.py
index d496a5f0..c8a71b16 100644
--- a/swarms/structs/multi_agent_router.py
+++ b/swarms/structs/multi_agent_router.py
@@ -22,6 +22,7 @@ from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.formatter import formatter
+from typing import Callable, Union
class AgentResponse(BaseModel):
@@ -59,7 +60,7 @@ class MultiAgentRouter:
self,
name: str = "swarm-router",
description: str = "Routes tasks to specialized agents based on their capabilities",
- agents: List[Agent] = [],
+ agents: List[Union[Agent, Callable]] = [],
model: str = "gpt-4o-mini",
temperature: float = 0.1,
shared_memory_system: callable = None,
diff --git a/swarms/structs/swarm_matcher.py b/swarms/structs/swarm_matcher.py
index 6d32d8d2..ce0e9e81 100644
--- a/swarms/structs/swarm_matcher.py
+++ b/swarms/structs/swarm_matcher.py
@@ -1,8 +1,11 @@
import json
-from typing import List, Optional, Tuple
+import os
+from typing import Dict, List, Literal, Optional, Tuple, Union
-from pydantic import BaseModel, Field
-from tenacity import retry, stop_after_attempt, wait_exponential
+import numpy as np
+from pydantic import BaseModel, Field, field_validator
+from pydantic.v1 import validator
+from litellm import embedding
from swarms.utils.auto_download_check_packages import (
auto_check_and_download_package,
@@ -20,18 +23,75 @@ class SwarmType(BaseModel):
)
+api_key = os.getenv("OPENAI_API_KEY")
+
+
class SwarmMatcherConfig(BaseModel):
- model_name: str = "sentence-transformers/all-MiniLM-L6-v2"
- embedding_dim: int = (
- 512 # Dimension of the sentence-transformers model
+ backend: Literal["local", "openai"] = "local"
+ model_name: str = (
+ "sentence-transformers/all-MiniLM-L6-v2" # For local embeddings
+ )
+ openai_model: str = (
+ "text-embedding-3-small" # Default to newer OpenAI model
+ )
+ embedding_dim: int = 512 # For local embeddings
+ openai_dimensions: Optional[int] = (
+ None # For OpenAI text-embedding-3-* models
)
+ similarity_threshold: float = Field(default=0.5, ge=0.0, le=1.0)
+ cache_embeddings: bool = True
+ max_sequence_length: int = Field(default=512, ge=64, le=2048)
+ device: str = "cpu" # Only used for local embeddings
+ batch_size: int = Field(default=32, ge=1)
+ openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY")
+ metadata: Optional[Dict] = Field(
+ default_factory=dict
+ ) # For OpenAI embedding calls
+
+ class Config:
+ validate_assignment = True
+
+ @validator("openai_dimensions")
+ def validate_dimensions(cls, v, values):
+ if values.get("backend") == "openai":
+ if (
+ values.get("openai_model", "").startswith(
+ "text-embedding-3"
+ )
+ and v is None
+ ):
+ # Default to 1536 for text-embedding-3-small/large if not specified
+ return 1536
+ return v
+
+ @field_validator("openai_model")
+ def validate_model(cls, v, values):
+ if values.get("backend") == "openai":
+ valid_models = [
+ "text-embedding-3-small",
+ "text-embedding-3-large",
+ "text-embedding-ada-002",
+ ]
+ if v not in valid_models:
+ raise ValueError(
+ f"OpenAI model must be one of: {', '.join(valid_models)}"
+ )
+ return v
class SwarmMatcher:
"""
- A class for matching tasks to swarm types based on their descriptions.
- It utilizes a transformer model to generate embeddings for task and swarm type descriptions,
- and then calculates the dot product to find the best match.
+ A class for matching tasks to swarm types based on their descriptions using semantic similarity.
+
+ This class uses transformer models to generate embeddings for both task descriptions and swarm type descriptions.
+ It then calculates similarity scores to find the most appropriate swarm type for a given task.
+
+ Features:
+ - Supports both local transformer models and OpenAI embeddings
+ - Implements embedding caching for improved performance
+ - Provides batch processing capabilities
+ - Includes retry mechanisms for API calls
+ - Supports saving/loading swarm type configurations
"""
def __init__(self, config: SwarmMatcherConfig):
@@ -39,15 +99,35 @@ class SwarmMatcher:
Initializes the SwarmMatcher with a configuration.
Args:
- config (SwarmMatcherConfig): The configuration for the SwarmMatcher.
+ config (SwarmMatcherConfig): Configuration object specifying model settings,
+ similarity thresholds, and other parameters.
+
+ Raises:
+ ImportError: If required dependencies (torch, transformers) are not available
+ Exception: If model initialization fails
"""
+ try:
+ self.config = config
+ if self.config.backend == "local":
+ transformers = self._setup_dependencies()
+ self._setup_model_and_tokenizer(transformers)
+ self._initialize_state()
+ self.initialize_swarm_types()
+ logger.debug("SwarmMatcher initialized successfully")
+ except Exception as e:
+ logger.error(f"Error initializing SwarmMatcher: {str(e)}")
+ raise
+ def _setup_dependencies(self):
+ """Set up required dependencies for the SwarmMatcher."""
try:
+ import numpy as np
import torch
except ImportError:
auto_check_and_download_package(
"torch", package_manager="pip", upgrade=True
)
+ import numpy as np
import torch
try:
@@ -59,65 +139,279 @@ class SwarmMatcher:
import transformers
self.torch = torch
+ self.np = np
+ return transformers
+
+ def _setup_model_and_tokenizer(self, transformers):
+ """Initialize the model and tokenizer."""
+ self.device = self.torch.device(self.config.device)
+ self.tokenizer = transformers.AutoTokenizer.from_pretrained(
+ self.config.model_name
+ )
+ self.model = transformers.AutoModel.from_pretrained(
+ self.config.model_name
+ ).to(self.device)
+
+ def _initialize_state(self):
+ """Initialize internal state variables."""
+ self.swarm_types: List[SwarmType] = []
+ self._embedding_cache = (
+ {} if self.config.cache_embeddings else None
+ )
+
+ def _get_cached_embedding(
+ self, text: str
+ ) -> Optional[np.ndarray]:
+ """
+ Retrieves a cached embedding if available.
+
+ Args:
+ text (str): The text to look up in the cache
+
+ Returns:
+ Optional[np.ndarray]: The cached embedding if found, None otherwise
+ """
+ if self._embedding_cache is not None:
+ return self._embedding_cache.get(text)
+ return None
+
+ def _cache_embedding(self, text: str, embedding: np.ndarray):
+ """
+ Stores an embedding in the cache for future use.
+
+ Args:
+ text (str): The text associated with the embedding
+ embedding (np.ndarray): The embedding vector to cache
+ """
+ if self._embedding_cache is not None:
+ self._embedding_cache[text] = embedding
+
+ def _get_openai_embedding(self, text: str) -> np.ndarray:
+ """Get embedding using OpenAI's API via litellm."""
try:
- self.config = config
- self.tokenizer = (
- transformers.AutoTokenizer.from_pretrained(
- config.model_name
+ params = {
+ "model": self.config.openai_model,
+ "input": [text],
+ }
+
+ # Add dimensions parameter for text-embedding-3-* models
+ if (
+ self.config.openai_model.startswith(
+ "text-embedding-3"
)
- )
- self.model = transformers.AutoModel.from_pretrained(
- config.model_name
- )
- self.swarm_types: List[SwarmType] = []
- logger.debug("SwarmMatcher initialized successfully")
+ and self.config.openai_dimensions
+ ):
+ params["dimensions"] = self.config.openai_dimensions
+
+ response = embedding(**params)
+ response = response.model_dump()
+
+ # Handle the response format
+ if "data" in response and len(response["data"]) > 0:
+ embedding_data = response["data"][0]["embedding"]
+ else:
+ raise ValueError(
+ f"Unexpected response format from OpenAI API: {response}"
+ )
+
+ embedding_array = np.array(embedding_data)
+
+ # Log usage information if available
+ if "usage" in response:
+ logger.debug(
+ f"OpenAI API usage - Prompt tokens: {response['usage'].get('prompt_tokens', 'N/A')}, "
+ f"Total tokens: {response['usage'].get('total_tokens', 'N/A')}"
+ )
+
+ return embedding_array
except Exception as e:
- logger.error(f"Error initializing SwarmMatcher: {str(e)}")
+ logger.error(f"Error getting OpenAI embedding: {str(e)}")
raise
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_exponential(multiplier=1, min=4, max=10),
- )
- def get_embedding(self, text: str):
+ def _get_openai_embeddings_batch(
+ self, texts: List[str]
+ ) -> np.ndarray:
+ """Get embeddings for a batch of texts using OpenAI's API via litellm."""
+ try:
+ params = {
+ "model": self.config.openai_model,
+ "input": texts,
+ }
+
+ # Add dimensions parameter for text-embedding-3-* models
+ if (
+ self.config.openai_model.startswith(
+ "text-embedding-3"
+ )
+ and self.config.openai_dimensions
+ ):
+ params["dimensions"] = self.config.openai_dimensions
+
+ response = embedding(**params)
+ response = response.model_dump()
+
+ # Handle the response format
+ if "data" in response:
+ embeddings = [
+ data["embedding"] for data in response["data"]
+ ]
+ else:
+ raise ValueError(
+ f"Unexpected response format from OpenAI API: {response}"
+ )
+
+ # Log usage information if available
+ if "usage" in response:
+ logger.debug(
+ f"Batch OpenAI API usage - Prompt tokens: {response['usage'].get('prompt_tokens', 'N/A')}, "
+ f"Total tokens: {response['usage'].get('total_tokens', 'N/A')}"
+ )
+
+ return np.array(embeddings)
+ except Exception as e:
+ logger.error(
+ f"Error getting OpenAI embeddings batch: {str(e)}"
+ )
+ raise
+
+ def get_embedding(self, text: str) -> np.ndarray:
"""
Generates an embedding for a given text using the configured model.
+ This method first checks the cache for an existing embedding. If not found,
+ it generates a new embedding using either the local transformer model or OpenAI API.
+
Args:
- text (str): The text for which to generate an embedding.
+ text (str): The text for which to generate an embedding
Returns:
- np.ndarray: The embedding vector for the text.
+ np.ndarray: The embedding vector for the text
+
+ Raises:
+ Exception: If embedding generation fails
"""
+ # Check cache first
+ cached_embedding = self._get_cached_embedding(text)
+ if cached_embedding is not None:
+ return cached_embedding
logger.debug(f"Getting embedding for text: {text[:50]}...")
try:
- inputs = self.tokenizer(
- text,
- return_tensors="pt",
- padding=True,
- truncation=True,
- max_length=512,
- )
- with self.torch.no_grad():
- outputs = self.model(**inputs)
- embedding = (
- outputs.last_hidden_state.mean(dim=1)
- .squeeze()
- .numpy()
- )
+ if self.config.backend == "openai":
+ embedding = self._get_openai_embedding(text)
+ else:
+ inputs = self.tokenizer(
+ text,
+ return_tensors="pt",
+ padding=True,
+ truncation=True,
+ max_length=self.config.max_sequence_length,
+ )
+ # Move inputs to device
+ inputs = {
+ k: v.to(self.device) for k, v in inputs.items()
+ }
+
+ with self.torch.no_grad():
+ outputs = self.model(**inputs)
+ embedding = (
+ outputs.last_hidden_state.mean(dim=1)
+ .squeeze()
+ .cpu()
+ .numpy()
+ )
+
+ # Cache the embedding
+ self._cache_embedding(text, embedding)
+
logger.debug("Embedding generated successfully")
return embedding
except Exception as e:
logger.error(f"Error generating embedding: {str(e)}")
raise
+ def get_embeddings_batch(self, texts: List[str]) -> np.ndarray:
+ """
+ Generate embeddings for multiple texts in batch for improved efficiency.
+
+ This method processes texts in batches, utilizing the cache where possible
+ and generating new embeddings only for uncached texts.
+
+ Args:
+ texts (List[str]): List of texts to generate embeddings for
+
+ Returns:
+ np.ndarray: Array of embeddings, one for each input text
+
+ Raises:
+ Exception: If batch processing fails
+ """
+ embeddings = []
+ batch_texts = []
+
+ for text in texts:
+ cached_embedding = self._get_cached_embedding(text)
+ if cached_embedding is not None:
+ embeddings.append(cached_embedding)
+ else:
+ batch_texts.append(text)
+
+ if batch_texts:
+ if self.config.backend == "openai":
+ batch_embeddings = self._get_openai_embeddings_batch(
+ batch_texts
+ )
+ for text, embedding in zip(
+ batch_texts, batch_embeddings
+ ):
+ self._cache_embedding(text, embedding)
+ embeddings.append(embedding)
+ else:
+ for i in range(
+ 0, len(batch_texts), self.config.batch_size
+ ):
+ batch = batch_texts[
+ i : i + self.config.batch_size
+ ]
+ inputs = self.tokenizer(
+ batch,
+ return_tensors="pt",
+ padding=True,
+ truncation=True,
+ max_length=self.config.max_sequence_length,
+ )
+ inputs = {
+ k: v.to(self.device)
+ for k, v in inputs.items()
+ }
+
+ with self.torch.no_grad():
+ outputs = self.model(**inputs)
+ batch_embeddings = (
+ outputs.last_hidden_state.mean(dim=1)
+ .cpu()
+ .numpy()
+ )
+
+ for text, embedding in zip(
+ batch, batch_embeddings
+ ):
+ self._cache_embedding(text, embedding)
+ embeddings.append(embedding)
+
+ return np.array(embeddings)
+
def add_swarm_type(self, swarm_type: SwarmType):
"""
- Adds a swarm type to the list of swarm types, generating an embedding for its description.
+ Adds a swarm type to the matcher's registry.
+
+ Generates and stores an embedding for the swarm type's description.
Args:
- swarm_type (SwarmType): The swarm type to add.
+ swarm_type (SwarmType): The swarm type to add
+
+ Raises:
+ Exception: If embedding generation or storage fails
"""
logger.debug(f"Adding swarm type: {swarm_type.name}")
try:
@@ -133,38 +427,104 @@ class SwarmMatcher:
def find_best_match(self, task: str) -> Tuple[str, float]:
"""
- Finds the best match for a given task among the registered swarm types.
+ Finds the best matching swarm type for a given task.
+
+ Uses semantic similarity to compare the task against all registered swarm types
+ and returns the best match along with its confidence score.
Args:
- task (str): The task for which to find the best match.
+ task (str): The task description to match
Returns:
- Tuple[str, float]: A tuple containing the name of the best matching swarm type and the score.
- """
- import numpy as np
+ Tuple[str, float]: A tuple containing:
+ - The name of the best matching swarm type
+ - The similarity score (between 0 and 1)
+ Raises:
+ Exception: If matching process fails
+ """
logger.debug(f"Finding best match for task: {task[:50]}...")
try:
task_embedding = self.get_embedding(task)
best_match = None
best_score = -float("inf")
- for swarm_type in self.swarm_types:
- score = np.dot(
- task_embedding, np.array(swarm_type.embedding)
+
+ # Get all swarm type embeddings in batch
+ swarm_descriptions = [
+ st.description for st in self.swarm_types
+ ]
+ swarm_embeddings = self.get_embeddings_batch(
+ swarm_descriptions
+ )
+
+ # Calculate similarity scores in batch
+ scores = np.dot(task_embedding, swarm_embeddings.T)
+ best_idx = np.argmax(scores)
+ best_score = float(scores[best_idx])
+ best_match = self.swarm_types[best_idx]
+
+ if best_score < self.config.similarity_threshold:
+ logger.warning(
+ f"Best match score {best_score} is below threshold {self.config.similarity_threshold}"
)
- if score > best_score:
- best_score = score
- best_match = swarm_type
+
logger.info(
f"Best match for task: {best_match.name} (score: {best_score})"
)
- return best_match.name, float(best_score)
+ return best_match.name, best_score
except Exception as e:
logger.error(
f"Error finding best match for task: {str(e)}"
)
raise
+ def find_top_k_matches(
+ self, task: str, k: int = 3
+ ) -> List[Tuple[str, float]]:
+ """
+ Finds the top k matching swarm types for a given task.
+
+ Returns all matches that exceed the similarity threshold, sorted by score.
+
+ Args:
+ task (str): The task for which to find matches.
+ k (int): Number of top matches to return.
+
+ Returns:
+ List[Tuple[str, float]]: List of tuples containing swarm names and their scores.
+ """
+ logger.debug(
+ f"Finding top {k} matches for task: {task[:50]}..."
+ )
+ try:
+ task_embedding = self.get_embedding(task)
+ swarm_descriptions = [
+ st.description for st in self.swarm_types
+ ]
+ swarm_embeddings = self.get_embeddings_batch(
+ swarm_descriptions
+ )
+
+ # Calculate similarity scores in batch
+ scores = np.dot(task_embedding, swarm_embeddings.T)
+ top_k_indices = np.argsort(scores)[-k:][::-1]
+
+ results = []
+ for idx in top_k_indices:
+ score = float(scores[idx])
+ if score >= self.config.similarity_threshold:
+ results.append(
+ (self.swarm_types[idx].name, score)
+ )
+
+ logger.info(
+ f"Found {len(results)} matches above threshold"
+ )
+ return results
+ except Exception as e:
+ logger.error(f"Error finding top matches: {str(e)}")
+ raise
+
def auto_select_swarm(self, task: str) -> str:
"""
Automatically selects the best swarm type for a given task based on their descriptions.
@@ -226,56 +586,86 @@ class SwarmMatcher:
logger.error(f"Error loading swarm types: {str(e)}")
raise
+ def initialize_swarm_types(self):
+ logger.debug("Initializing swarm types")
+ swarm_types = [
+ SwarmType(
+ name="AgentRearrange",
+ description="Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation and minimizing bottlenecks. Keywords: orchestration, coordination, pipeline optimization, task scheduling, resource allocation, workflow management, agent organization, process optimization",
+ ),
+ SwarmType(
+ name="MixtureOfAgents",
+ description="Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach to problem-solving and leveraging individual strengths. Keywords: multi-agent system, expert collaboration, distributed intelligence, collective problem solving, agent specialization, team coordination, hybrid approaches, knowledge synthesis",
+ ),
+ SwarmType(
+ name="SpreadSheetSwarm",
+ description="Collaborative data processing and analysis in a spreadsheet-like environment, facilitating real-time data sharing and visualization. Keywords: data analysis, tabular processing, collaborative editing, data transformation, spreadsheet operations, data visualization, real-time collaboration, structured data",
+ ),
+ SwarmType(
+ name="SequentialWorkflow",
+ description="Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical approach to task execution. Keywords: linear processing, waterfall methodology, step-by-step execution, ordered tasks, sequential operations, process flow, systematic approach, staged execution",
+ ),
+ SwarmType(
+ name="ConcurrentWorkflow",
+ description="Process multiple tasks or data sources concurrently in parallel, maximizing productivity and reducing processing time. Keywords: parallel processing, multi-threading, asynchronous execution, distributed computing, concurrent operations, simultaneous tasks, parallel workflows, scalable processing",
+ ),
+ SwarmType(
+ name="HierarchicalSwarm",
+ description="Organize agents in a hierarchical structure with clear reporting lines and delegation of responsibilities. Keywords: management hierarchy, organizational structure, delegation, supervision, chain of command, tiered organization, structured coordination, leadership roles",
+ ),
+ SwarmType(
+ name="AdaptiveSwarm",
+ description="Dynamically adjust agent behavior and swarm configuration based on task requirements and performance feedback. Keywords: dynamic adaptation, self-optimization, feedback loops, learning systems, flexible configuration, responsive behavior, adaptive algorithms, real-time adjustment",
+ ),
+ SwarmType(
+ name="ConsensusSwarm",
+ description="Achieve group decisions through consensus mechanisms and voting protocols among multiple agents. Keywords: group decision making, voting systems, collective intelligence, agreement protocols, democratic processes, collaborative decisions, consensus building",
+ ),
+ SwarmType(
+ name="DeepResearchSwarm",
+ description="Conduct in-depth research and analysis by coordinating multiple agents to explore, synthesize, and validate information from various sources. Keywords: research methodology, information synthesis, data validation, comprehensive analysis, knowledge discovery, systematic investigation",
+ ),
+ SwarmType(
+ name="CouncilAsAJudge",
+ description="Evaluate and judge solutions or decisions through a council of expert agents acting as arbitrators. Keywords: evaluation, judgment, arbitration, expert assessment, quality control, decision validation, peer review, consensus building",
+ ),
+ SwarmType(
+ name="MALT",
+ description="Multi-Agent Language Tasks framework for coordinating language-based operations across multiple specialized agents. Keywords: language processing, task coordination, linguistic analysis, communication protocols, semantic understanding, natural language tasks",
+ ),
+ SwarmType(
+ name="GroupChat",
+ description="Enable dynamic multi-agent conversations and collaborative problem-solving through structured group discussions. Keywords: collaborative dialogue, group interaction, team communication, collective problem-solving, discussion facilitation, knowledge sharing",
+ ),
+ SwarmType(
+ name="MultiAgentRouter",
+ description="Intelligently route tasks and information between agents based on their specializations and current workload. Keywords: task distribution, load balancing, intelligent routing, agent specialization, workflow optimization, resource allocation",
+ ),
+ SwarmType(
+ name="MajorityVoting",
+ description="Make decisions through democratic voting mechanisms where multiple agents contribute their opinions and votes. Keywords: collective decision-making, democratic process, vote aggregation, opinion pooling, consensus building, collaborative choice",
+ ),
+ ]
+
+ try:
+ for swarm_type in swarm_types:
+ self.add_swarm_type(swarm_type)
+ except Exception as e:
+ logger.error(f"Error initializing swarm types: {str(e)}")
+ raise
+
-def initialize_swarm_types(matcher: SwarmMatcher):
- logger.debug("Initializing swarm types")
- swarm_types = [
- SwarmType(
- name="AgentRearrange",
- description="Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation and minimizing bottlenecks. Keywords: orchestration, coordination, pipeline optimization, task scheduling, resource allocation, workflow management, agent organization, process optimization",
- ),
- SwarmType(
- name="MixtureOfAgents",
- description="Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach to problem-solving and leveraging individual strengths. Keywords: multi-agent system, expert collaboration, distributed intelligence, collective problem solving, agent specialization, team coordination, hybrid approaches, knowledge synthesis",
- ),
- SwarmType(
- name="SpreadSheetSwarm",
- description="Collaborative data processing and analysis in a spreadsheet-like environment, facilitating real-time data sharing and visualization. Keywords: data analysis, tabular processing, collaborative editing, data transformation, spreadsheet operations, data visualization, real-time collaboration, structured data",
- ),
- SwarmType(
- name="SequentialWorkflow",
- description="Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical approach to task execution. Keywords: linear processing, waterfall methodology, step-by-step execution, ordered tasks, sequential operations, process flow, systematic approach, staged execution",
- ),
- SwarmType(
- name="ConcurrentWorkflow",
- description="Process multiple tasks or data sources concurrently in parallel, maximizing productivity and reducing processing time. Keywords: parallel processing, multi-threading, asynchronous execution, distributed computing, concurrent operations, simultaneous tasks, parallel workflows, scalable processing",
- ),
- # SwarmType(
- # name="HierarchicalSwarm",
- # description="Organize agents in a hierarchical structure with clear reporting lines and delegation of responsibilities. Keywords: management hierarchy, organizational structure, delegation, supervision, chain of command, tiered organization, structured coordination",
- # ),
- # SwarmType(
- # name="AdaptiveSwarm",
- # description="Dynamically adjust agent behavior and swarm configuration based on task requirements and performance feedback. Keywords: dynamic adaptation, self-optimization, feedback loops, learning systems, flexible configuration, responsive behavior, adaptive algorithms",
- # ),
- # SwarmType(
- # name="ConsensusSwarm",
- # description="Achieve group decisions through consensus mechanisms and voting protocols among multiple agents. Keywords: group decision making, voting systems, collective intelligence, agreement protocols, democratic processes, collaborative decisions",
- # ),
- ]
-
- for swarm_type in swarm_types:
- matcher.add_swarm_type(swarm_type)
- logger.debug("Swarm types initialized")
-
-
-def swarm_matcher(task: str, *args, **kwargs):
+def swarm_matcher(task: Union[str, List[str]], *args, **kwargs):
"""
Runs the SwarmMatcher example with predefined tasks and swarm types.
"""
+ if isinstance(task, list):
+ task = "".join(task)
+ else:
+ task = task
+
config = SwarmMatcherConfig()
matcher = SwarmMatcher(config)
- initialize_swarm_types(matcher)
# matcher.save_swarm_types(f"swarm_logs/{uuid4().hex}.json")
@@ -286,319 +676,18 @@ def swarm_matcher(task: str, *args, **kwargs):
return swarm_type
-# from typing import List, Tuple, Dict
-# from pydantic import BaseModel, Field
-# from loguru import logger
-# from uuid import uuid4
-# import chromadb
-# import json
-# from tenacity import retry, stop_after_attempt, wait_exponential
-
-
-# class SwarmType(BaseModel):
-# """A swarm type with its name, description and optional metadata"""
-
-# id: str = Field(default_factory=lambda: str(uuid4()))
-# name: str
-# description: str
-# metadata: Dict = Field(default_factory=dict)
-
-
-# class SwarmMatcherConfig(BaseModel):
-# """Configuration for the SwarmMatcher"""
-
-# collection_name: str = "swarm_types"
-# distance_metric: str = "cosine" # or "l2" or "ip"
-# embedding_function: str = (
-# "sentence-transformers/all-mpnet-base-v2" # Better model than MiniLM
-# )
-# persist_directory: str = "./chroma_db"
-
-
-# class SwarmMatcher:
-# """
-# An improved swarm matcher that uses ChromaDB for better vector similarity search.
-# Features:
-# - Persistent storage of embeddings
-# - Better vector similarity search with multiple distance metrics
-# - Improved embedding model
-# - Metadata filtering capabilities
-# - Batch operations support
-# """
-
-# def __init__(self, config: SwarmMatcherConfig):
-# """Initialize the improved swarm matcher"""
-# logger.add("swarm_matcher.log", rotation="100 MB")
-# self.config = config
-
-# # Initialize ChromaDB client with persistence
-# self.chroma_client = chromadb.Client()
-
-# # Get or create collection
-# try:
-# self.collection = self.chroma_client.get_collection(
-# name=config.collection_name,
-# )
-# except ValueError:
-# self.collection = self.chroma_client.create_collection(
-# name=config.collection_name,
-# metadata={"hnsw:space": config.distance_metric},
-# )
-
-# logger.info(
-# f"Initialized SwarmMatcher with collection '{config.collection_name}'"
-# )
-
-# def add_swarm_type(self, swarm_type: SwarmType) -> None:
-# """Add a single swarm type to the collection"""
-# try:
-# self.collection.add(
-# ids=[swarm_type.id],
-# documents=[swarm_type.description],
-# metadatas=[
-# {"name": swarm_type.name, **swarm_type.metadata}
-# ],
-# )
-# logger.info(f"Added swarm type: {swarm_type.name}")
-# except Exception as e:
-# logger.error(
-# f"Error adding swarm type {swarm_type.name}: {str(e)}"
-# )
-# raise
-
-# def add_swarm_types(self, swarm_types: List[SwarmType]) -> None:
-# """Add multiple swarm types in batch"""
-# try:
-# self.collection.add(
-# ids=[st.id for st in swarm_types],
-# documents=[st.description for st in swarm_types],
-# metadatas=[
-# {"name": st.name, **st.metadata}
-# for st in swarm_types
-# ],
-# )
-# logger.info(f"Added {len(swarm_types)} swarm types")
-# except Exception as e:
-# logger.error(
-# f"Error adding swarm types in batch: {str(e)}"
-# )
-# raise
-
-# @retry(
-# stop=stop_after_attempt(3),
-# wait=wait_exponential(multiplier=1, min=4, max=10),
-# )
-# def find_best_matches(
-# self,
-# task: str,
-# n_results: int = 3,
-# score_threshold: float = 0.7,
-# ) -> List[Tuple[str, float]]:
-# """
-# Find the best matching swarm types for a given task
-# Returns multiple matches with their scores
-# """
-# try:
-# results = self.collection.query(
-# query_texts=[task],
-# n_results=n_results,
-# include=["metadatas", "distances"],
-# )
-
-# matches = []
-# for metadata, distance in zip(
-# results["metadatas"][0], results["distances"][0]
-# ):
-# # Convert distance to similarity score (1 - normalized_distance)
-# score = 1 - (
-# distance / 2
-# ) # Normalize cosine distance to [0,1]
-# if score >= score_threshold:
-# matches.append((metadata["name"], score))
-
-# logger.info(f"Found {len(matches)} matches for task")
-# return matches
-
-# except Exception as e:
-# logger.error(f"Error finding matches for task: {str(e)}")
-# raise
-
-# def auto_select_swarm(self, task: str) -> str:
-# """
-# Automatically select the best swarm type for a task
-# Returns only the top match
-# """
-# matches = self.find_best_matches(task, n_results=1)
-# if not matches:
-# logger.warning("No suitable matches found for task")
-# return "SequentialWorkflow" # Default fallback
-
-# best_match, score = matches[0]
-# logger.info(
-# f"Selected swarm type '{best_match}' with confidence {score:.3f}"
-# )
-# return best_match
-
-# def run_multiple(self, tasks: List[str]) -> List[str]:
-# """Process multiple tasks in batch"""
-# return [self.auto_select_swarm(task) for task in tasks]
-
-# def save_swarm_types(self, filename: str) -> None:
-# """Export swarm types to JSON"""
-# try:
-# all_data = self.collection.get(
-# include=["metadatas", "documents"]
-# )
-# swarm_types = [
-# SwarmType(
-# id=id_,
-# name=metadata["name"],
-# description=document,
-# metadata={
-# k: v
-# for k, v in metadata.items()
-# if k != "name"
-# },
-# )
-# for id_, metadata, document in zip(
-# all_data["ids"],
-# all_data["metadatas"],
-# all_data["documents"],
-# )
-# ]
-
-# with open(filename, "w") as f:
-# json.dump(
-# [st.dict() for st in swarm_types], f, indent=2
-# )
-# logger.info(f"Saved swarm types to {filename}")
-# except Exception as e:
-# logger.error(f"Error saving swarm types: {str(e)}")
-# raise
-
-# def load_swarm_types(self, filename: str) -> None:
-# """Import swarm types from JSON"""
-# try:
-# with open(filename, "r") as f:
-# swarm_types_data = json.load(f)
-# swarm_types = [SwarmType(**st) for st in swarm_types_data]
-# self.add_swarm_types(swarm_types)
-# logger.info(f"Loaded swarm types from {filename}")
-# except Exception as e:
-# logger.error(f"Error loading swarm types: {str(e)}")
-# raise
-
-
-# def initialize_default_swarm_types(matcher: SwarmMatcher) -> None:
-# """Initialize the matcher with default swarm types"""
-# swarm_types = [
-# SwarmType(
-# name="AgentRearrange",
-# description="""
-# Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation
-# and minimizing bottlenecks. Specialized in orchestration, coordination, pipeline optimization,
-# task scheduling, resource allocation, workflow management, agent organization, and process optimization.
-# Best for tasks requiring complex agent interactions and workflow optimization.
-# """,
-# metadata={
-# "category": "optimization",
-# "complexity": "high",
-# },
-# ),
-# SwarmType(
-# name="MixtureOfAgents",
-# description="""
-# Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach
-# to problem-solving and leveraging individual strengths. Focuses on multi-agent systems,
-# expert collaboration, distributed intelligence, collective problem solving, agent specialization,
-# team coordination, hybrid approaches, and knowledge synthesis. Ideal for complex problems
-# requiring multiple areas of expertise.
-# """,
-# metadata={
-# "category": "collaboration",
-# "complexity": "high",
-# },
-# ),
-# SwarmType(
-# name="SpreadSheetSwarm",
-# description="""
-# Collaborative data processing and analysis in a spreadsheet-like environment, facilitating
-# real-time data sharing and visualization. Specializes in data analysis, tabular processing,
-# collaborative editing, data transformation, spreadsheet operations, data visualization,
-# real-time collaboration, and structured data handling. Perfect for data-intensive tasks
-# requiring structured analysis.
-# """,
-# metadata={
-# "category": "data_processing",
-# "complexity": "medium",
-# },
-# ),
-# SwarmType(
-# name="SequentialWorkflow",
-# description="""
-# Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical
-# approach to task execution. Focuses on linear processing, waterfall methodology, step-by-step
-# execution, ordered tasks, sequential operations, process flow, systematic approach, and staged
-# execution. Best for tasks requiring strict order and dependencies.
-# """,
-# metadata={"category": "workflow", "complexity": "low"},
-# ),
-# SwarmType(
-# name="ConcurrentWorkflow",
-# description="""
-# Process multiple tasks or data sources concurrently in parallel, maximizing productivity
-# and reducing processing time. Specializes in parallel processing, multi-threading,
-# asynchronous execution, distributed computing, concurrent operations, simultaneous tasks,
-# parallel workflows, and scalable processing. Ideal for independent tasks that can be
-# processed simultaneously.
-# """,
-# metadata={"category": "workflow", "complexity": "medium"},
-# ),
-# ]
-
-# matcher.add_swarm_types(swarm_types)
-# logger.info("Initialized default swarm types")
-
-
-# def create_swarm_matcher(
-# persist_dir: str = "./chroma_db",
-# collection_name: str = "swarm_types",
-# ) -> SwarmMatcher:
-# """Convenience function to create and initialize a swarm matcher"""
+# # Example usage
+# if __name__ == "__main__":
+# # Create configuration
# config = SwarmMatcherConfig(
-# persist_directory=persist_dir, collection_name=collection_name
+# backend="openai", # Using local embeddings for this example
+# similarity_threshold=0.6, # Increase threshold for more strict matching
+# cache_embeddings=True,
# )
+
+# # Initialize matcher
# matcher = SwarmMatcher(config)
-# initialize_default_swarm_types(matcher)
-# return matcher
+# task = "I need to concurrently run 1000 tasks"
-# # Example usage
-# def swarm_matcher(task: str) -> str:
-# # Create and initialize matcher
-# matcher = create_swarm_matcher()
-
-# swarm_type = matcher.auto_select_swarm(task)
-# print(f"Task: {task}\nSelected Swarm: {swarm_type}\n")
-
-# return swarm_type
-
-
-# # # Example usage
-# # if __name__ == "__main__":
-# # # Create and initialize matcher
-# # matcher = create_swarm_matcher()
-
-# # # Example tasks
-# # tasks = [
-# # "Analyze this spreadsheet of sales data and create visualizations",
-# # "Coordinate multiple AI agents to solve a complex problem",
-# # "Process these tasks one after another in a specific order",
-# # "Write multiple blog posts about the latest advancements in swarm intelligence all at once",
-# # "Write a blog post about the latest advancements in swarm intelligence",
-# # ]
-
-# # # Process tasks
-# # for task in tasks:
-# # swarm_type = matcher.auto_select_swarm(task)
-# # print(f"Task: {task}\nSelected Swarm: {swarm_type}\n")
+# print(matcher.auto_select_swarm(task))
diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py
index 98c7da19..023db9d0 100644
--- a/swarms/structs/swarm_router.py
+++ b/swarms/structs/swarm_router.py
@@ -25,6 +25,8 @@ from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.malt import MALT
from swarms.structs.deep_research_swarm import DeepResearchSwarm
from swarms.structs.council_judge import CouncilAsAJudge
+from swarms.structs.interactive_groupchat import InteractiveGroupChat
+
logger = initialize_logger(log_folder="swarm_router")
@@ -43,6 +45,7 @@ SwarmType = Literal[
"MALT",
"DeepResearchSwarm",
"CouncilAsAJudge",
+ "InteractiveGroupChat",
]
@@ -187,7 +190,7 @@ class SwarmRouter:
shared_memory_system: Any = None,
rules: str = None,
documents: List[str] = [], # A list of docs file paths
- output_type: OutputType = "dict",
+ output_type: OutputType = "dict-all-except-first",
no_cluster_ops: bool = False,
speaker_fn: callable = None,
load_agents_from_csv: bool = False,
@@ -385,6 +388,15 @@ class SwarmRouter:
base_agent=self.agents[0] if self.agents else None,
)
+ elif self.swarm_type == "InteractiveGroupChat":
+ return InteractiveGroupChat(
+ name=self.name,
+ description=self.description,
+ agents=self.agents,
+ max_loops=self.max_loops,
+ output_type=self.output_type,
+ )
+
elif self.swarm_type == "DeepResearchSwarm":
return DeepResearchSwarm(
name=self.name,
diff --git a/swarms/utils/__init__.py b/swarms/utils/__init__.py
index b604186d..53cbcea6 100644
--- a/swarms/utils/__init__.py
+++ b/swarms/utils/__init__.py
@@ -11,7 +11,6 @@ from swarms.utils.file_processing import (
create_file_in_folder,
zip_folders,
)
-from swarms.utils.markdown_message import display_markdown_message
from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.try_except_wrapper import try_except_wrapper
@@ -33,7 +32,6 @@ __all__ = [
"zip_workspace",
"create_file_in_folder",
"zip_folders",
- "display_markdown_message",
"extract_code_from_markdown",
"pdf_to_text",
"try_except_wrapper",
diff --git a/swarms/utils/history_output_formatter.py b/swarms/utils/history_output_formatter.py
index b4b625b3..e190dd8e 100644
--- a/swarms/utils/history_output_formatter.py
+++ b/swarms/utils/history_output_formatter.py
@@ -25,6 +25,8 @@ def history_output_formatter(
return conversation.return_all_except_first()
elif type == "str-all-except-first":
return conversation.return_all_except_first_string()
+ elif type == "dict-final":
+ return conversation.return_dict_final()
elif type == "xml":
data = conversation.to_dict()
return to_xml_string(data, root_tag="conversation")
diff --git a/swarms/utils/markdown_message.py b/swarms/utils/markdown_message.py
deleted file mode 100644
index 03a35092..00000000
--- a/swarms/utils/markdown_message.py
+++ /dev/null
@@ -1,21 +0,0 @@
-from swarms.utils.formatter import formatter
-
-
-def display_markdown_message(message: str, color: str = "cyan"):
- """
- Display markdown message. Works with multiline strings with lots of indentation.
- Will automatically make single line > tags beautiful.
- """
-
- for line in message.split("\n"):
- line = line.strip()
- if line == "":
- print()
- elif line == "---":
- formatter.print_panel("-" * 50)
- else:
- formatter.print_panel(line)
-
- if "\n" not in message and message.startswith(">"):
- # Aesthetic choice. For these tags, they need a space below them
- print()
diff --git a/swarms/utils/output_types.py b/swarms/utils/output_types.py
index c09c4a6f..843d4608 100644
--- a/swarms/utils/output_types.py
+++ b/swarms/utils/output_types.py
@@ -16,6 +16,7 @@ HistoryOutputType = Literal[
"dict-all-except-first",
"str-all-except-first",
"basemodel",
+ "dict-final",
]
OutputType = HistoryOutputType
diff --git a/swarms/utils/visualizer.py b/swarms/utils/visualizer.py
deleted file mode 100644
index 849cfe28..00000000
--- a/swarms/utils/visualizer.py
+++ /dev/null
@@ -1,510 +0,0 @@
-import asyncio
-from dataclasses import dataclass
-from datetime import datetime
-from typing import Any, Callable, Dict, List, Optional
-
-import psutil
-from rich.console import Console
-from rich.layout import Layout
-from rich.live import Live
-from rich.panel import Panel
-from rich.progress import (
- Progress,
- SpinnerColumn,
- TextColumn,
- TimeElapsedColumn,
-)
-from rich.table import Table
-from rich.text import Text
-from rich.tree import Tree
-
-from swarms.structs.agent import Agent
-
-try:
- import pynvml
-
- pynvml.nvmlInit()
- GPU_ENABLED = True
-except ImportError:
- GPU_ENABLED = False
-
-
-@dataclass
-class SwarmMetadata:
- name: Optional[str] = None
- description: Optional[str] = None
- version: Optional[str] = None
- type: Optional[str] = None # hierarchical, parallel, sequential
- created_at: Optional[datetime] = None
- author: Optional[str] = None
- tags: Optional[List[str]] = None
- primary_objective: Optional[str] = None
- secondary_objectives: Optional[List[str]] = None
-
- def __post_init__(self):
- self.tags = self.tags or []
- self.secondary_objectives = self.secondary_objectives or []
- self.created_at = self.created_at or datetime.now()
-
-
-class SwarmVisualizationRich:
- def __init__(
- self,
- swarm_metadata: SwarmMetadata,
- agents: List[Agent],
- update_resources: bool = True,
- refresh_rate: float = 0.1,
- ):
- """
- Initializes the visualizer with a list of agents.
-
- Args:
- swarm_metadata (SwarmMetadata): Metadata for the swarm.
- agents (List[Agent]): List of root agents.
- update_resources (bool): Whether to update system resource stats.
- refresh_rate (float): Refresh rate for the live visualization.
- """
- self.swarm_metadata = swarm_metadata
- self.agents = agents
- self.update_resources = update_resources
- self.refresh_rate = refresh_rate
- self.console = Console()
- self.live = None
- # A dictionary mapping agent names to list of output messages
- self.output_history: Dict[str, List[Dict[str, Any]]] = {}
-
- # System monitoring
- self.cores_available = 0
- self.memory_usage = "N/A"
- self.gpu_power = "N/A"
- self.start_time = datetime.now()
-
- if self.update_resources:
- self._update_resource_stats()
-
- def _format_uptime(self) -> str:
- """Formats the swarm's uptime."""
- delta = datetime.now() - self.start_time
- hours, remainder = divmod(delta.seconds, 3600)
- minutes, seconds = divmod(remainder, 60)
- return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
-
- def _build_agent_tree(self, agents: List[Agent]) -> Tree:
- """
- Builds a detailed tree visualization for a list of agents.
-
- Args:
- agents (List[Agent]): The list of root agents.
-
- Returns:
- Tree: A rich Tree object displaying agent metadata.
- """
- tree = Tree("[bold underline]Agents[/bold underline]")
- for agent in agents:
- self._add_agent_to_tree(agent, tree)
- return tree
-
- def _add_agent_to_tree(self, agent: Agent, tree: Tree) -> None:
- """
- Recursively adds an agent and its children to the given tree.
-
- Args:
- agent (Agent): The agent to add.
- tree (Tree): The tree to update.
- """
- agent_info = [
- f"[bold cyan]{agent.name}[/bold cyan]",
- f"[yellow]Role:[/yellow] {agent.role}",
- ]
-
- # # Add any custom metadata from the agent (if available)
- # for key, value in getattr(agent, "metadata", {}).items():
- # agent_info.append(f"[white]{key}:[/white] {value}")
-
- # # Parameters summary if available
- # parameters = getattr(agent, "parameters", {})
- # if parameters:
- # param_summary = ", ".join(f"{k}: {v}" for k, v in parameters.items())
- # agent_info.append(f"[white]Parameters:[/white] {param_summary}")
-
- node_text = "\n".join(agent_info)
- branch = tree.add(node_text)
- for child in getattr(agent, "children", []):
- self._add_agent_to_tree(child, branch)
-
- def _count_agents(self, agents: List[Agent]) -> int:
- """
- Recursively counts total number of agents from a list of root agents.
-
- Args:
- agents (List[Agent]): List of agents.
-
- Returns:
- int: Total count of agents including children.
- """
- return len(agents)
-
- def _create_unified_info_panel(self) -> Panel:
- """
- Creates a unified panel showing swarm metadata and agents' metadata.
- """
- info_layout = Layout()
- info_layout.split_column(
- Layout(name="metadata", size=15),
- Layout(name="architecture"),
- )
-
- total_agents = self._count_agents(self.agents)
-
- # Metadata section
- metadata_table = Table.grid(padding=1, expand=True)
- metadata_table.add_column("Label", style="bold cyan")
- metadata_table.add_column("Value", style="white")
-
- # Update system resources if needed
- if self.update_resources:
- self._update_resource_stats()
-
- # Wrap the description text properly
- description_text = Text(
- self.swarm_metadata.description or "", style="italic"
- )
- description_text.wrap(self.console, width=60, overflow="fold")
-
- metadata_table.add_row(
- "Swarm Name", self.swarm_metadata.name or "N/A"
- )
- metadata_table.add_row("Description", description_text)
- metadata_table.add_row(
- "Version", self.swarm_metadata.version or "N/A"
- )
- metadata_table.add_row("Total Agents", str(total_agents))
- metadata_table.add_row(
- "Author", self.swarm_metadata.author or "N/A"
- )
- metadata_table.add_row(
- "System",
- f"CPU: {self.cores_available} cores | Memory: {self.memory_usage}",
- )
- metadata_table.add_row(
- "Primary Objective",
- self.swarm_metadata.primary_objective or "N/A",
- )
-
- info_layout["metadata"].update(metadata_table)
-
- # Architecture section with the agent tree
- architecture_tree = self._build_agent_tree(self.agents)
- info_layout["architecture"].update(architecture_tree)
-
- return Panel(
- info_layout,
- title="[bold]Swarm Information & Architecture[/bold]",
- )
-
- def _create_outputs_panel(self) -> Panel:
- """
- Creates a panel that displays stacked message history for all agents.
- """
- all_messages = []
-
- def collect_agent_messages(agent: Agent):
- """Recursively collect messages from an agent and its children."""
- messages = self.output_history.get(agent.name, [])
- for msg in messages:
- all_messages.append(
- {
- "agent": agent.name,
- "time": msg["time"],
- "content": msg["content"],
- "style": msg["style"],
- }
- )
- for child in getattr(agent, "children", []):
- collect_agent_messages(child)
-
- # Collect messages from every root agent
- for agent in self.agents:
- collect_agent_messages(agent)
-
- # Sort messages by timestamp
- all_messages.sort(key=lambda x: x["time"])
-
- messages_container = []
- for msg in all_messages:
- message_text = Text()
- message_text.append(f"[{msg['time']}] ", style="dim")
- message_text.append(
- f"{msg['agent']}: ", style="bold cyan"
- )
- message_text.append(msg["content"], style=msg["style"])
- messages_container.append(message_text)
-
- if messages_container:
- final_text = Text("\n").join(messages_container)
- else:
- final_text = Text("No messages yet...", style="dim")
-
- return Panel(
- final_text,
- title="[bold]Agent Communication Log[/bold]",
- border_style="green",
- padding=(1, 2),
- )
-
- def _update_resource_stats(self):
- """Updates system resource statistics."""
- self.cores_available = psutil.cpu_count(logical=True)
- mem_info = psutil.virtual_memory()
- total_gb = mem_info.total / (1024**3)
- used_gb = mem_info.used / (1024**3)
- self.memory_usage = f"{used_gb:.1f}GB / {total_gb:.1f}GB ({mem_info.percent}%)"
-
- if GPU_ENABLED:
- try:
- device_count = pynvml.nvmlDeviceGetCount()
- gpu_info = []
- for i in range(device_count):
- handle = pynvml.nvmlDeviceGetHandleByIndex(i)
- name = pynvml.nvmlDeviceGetName(handle).decode()
- mem = pynvml.nvmlDeviceGetMemoryInfo(handle)
- usage = (mem.used / mem.total) * 100
- gpu_info.append(f"{name}: {usage:.1f}%")
- self.gpu_power = " | ".join(gpu_info)
- except Exception as e:
- self.gpu_power = f"GPU Error: {str(e)}"
- else:
- self.gpu_power = "No GPU detected"
-
- async def stream_output(
- self,
- agent: Agent,
- text: str,
- title: Optional[str] = None,
- style: str = "bold cyan",
- delay: float = 0.05,
- by_word: bool = False,
- ):
- """
- Streams output for a specific agent with token-by-token animation.
-
- Args:
- agent (Agent): The agent whose output is being streamed.
- text (str): The text to stream.
- title (Optional[str]): Custom title for the output panel.
- style (str): Style for the output text.
- delay (float): Delay between tokens.
- by_word (bool): If True, stream word by word instead of character by character.
- """
- display_text = Text(style=style)
- current_output = ""
-
- tokens = text.split() if by_word else text
- title = title or f"{agent.name} Output"
-
- for token in tokens:
- token_with_space = token + (" " if by_word else "")
- current_output += token_with_space
- display_text.append(token_with_space)
-
- if agent.name not in self.output_history:
- self.output_history[agent.name] = []
-
- if token == tokens[-1]:
- timestamp = datetime.now().strftime("%H:%M:%S")
- self.output_history[agent.name].append(
- {
- "time": timestamp,
- "content": current_output,
- "style": style,
- }
- )
-
- if self.live:
- self.live.update(self._create_layout())
- await asyncio.sleep(delay)
-
- def log_agent_output(self, agent: Agent, text: str):
- asyncio.create_task(
- self.stream_output(
- agent=agent,
- text=text,
- title=f"{agent.name} Output {agent.max_loops}",
- )
- )
-
- async def print_progress(
- self,
- description: str,
- task_fn: Callable,
- *args: Any,
- **kwargs: Any,
- ) -> Any:
- """
- Displays a progress spinner while executing a task.
-
- Args:
- description (str): Task description.
- task_fn (Callable): Function to execute.
- *args (Any): Arguments for task_fn.
- **kwargs (Any): Keyword arguments for task_fn.
-
- Returns:
- Any: The result of task_fn.
- """
- progress = Progress(
- SpinnerColumn(),
- TextColumn("[progress.description]{task.description}"),
- TimeElapsedColumn(),
- )
-
- try:
- with progress:
- task = progress.add_task(description, total=None)
- result = await task_fn(*args, **kwargs)
- progress.update(task, completed=True)
- return result
- except Exception as e:
- progress.stop()
- raise e
-
- def _create_layout(self) -> Layout:
- """Creates the main visualization layout."""
- layout = Layout()
- layout.split_row(
- Layout(name="info", ratio=2),
- Layout(name="outputs", ratio=3),
- )
-
- layout["info"].update(self._create_unified_info_panel())
- layout["outputs"].update(self._create_outputs_panel())
-
- return layout
-
- async def start(self):
- """Starts the visualization with live updates."""
- with Live(
- self._create_layout(),
- refresh_per_second=int(1 / self.refresh_rate),
- ) as self.live:
- while True:
-
- def process_agent_streams(agent: Agent):
- while not agent.output_stream.empty():
- new_output = agent.output_stream.get()
- asyncio.create_task(
- self.stream_output(agent, new_output)
- )
- for child in getattr(agent, "children", []):
- process_agent_streams(child)
-
- # Process streams for each root agent
- for agent in self.agents:
- process_agent_streams(agent)
- await asyncio.sleep(self.refresh_rate)
-
-
-# # Example usage
-# if __name__ == "__main__":
-# # Create swarm metadata
-# swarm_metadata = SwarmMetadata(
-# name="Financial Advisory Swarm",
-# description="Intelligent swarm for financial analysis and advisory",
-# version="1.0.0",
-# type="hierarchical",
-# created_at=datetime.now(),
-# author="AI Research Team",
-# # tags=["finance", "analysis", "advisory"],
-# primary_objective="Provide comprehensive financial analysis and recommendations",
-# secondary_objectives=[
-# "Monitor market trends",
-# "Analyze competitor behavior",
-# "Generate investment strategies",
-# ],
-# )
-
-# # Create agent hierarchy with detailed parameters
-# analyst = Agent(
-# name="Financial Analyst",
-# role="Analysis",
-# description="Analyzes financial data and market trends",
-# agent_type="LLM",
-# capabilities=[
-# "data analysis",
-# "trend detection",
-# "risk assessment",
-# ],
-# parameters={"model": "gpt-4", "temperature": 0.7},
-# metadata={
-# "specialty": "Market Analysis",
-# "confidence_threshold": "0.85",
-# },
-# )
-
-# researcher = Agent(
-# name="Market Researcher",
-# role="Research",
-# description="Conducts market research and competitor analysis",
-# agent_type="Neural",
-# capabilities=[
-# "competitor analysis",
-# "market sentiment",
-# "trend forecasting",
-# ],
-# parameters={"batch_size": 32, "learning_rate": 0.001},
-# metadata={
-# "data_sources": "Bloomberg, Reuters",
-# "update_frequency": "1h",
-# },
-# )
-
-# advisor = Agent(
-# name="Investment Advisor",
-# role="Advisory",
-# description="Provides investment recommendations",
-# agent_type="Hybrid",
-# capabilities=[
-# "portfolio optimization",
-# "risk management",
-# "strategy generation",
-# ],
-# parameters={
-# "risk_tolerance": "moderate",
-# "time_horizon": "long",
-# },
-# metadata={
-# "certification": "CFA Level 3",
-# "specialization": "Equity",
-# },
-# children=[analyst, researcher],
-# )
-
-# # Create visualization
-# viz = SwarmVisualizationRich(
-# swarm_metadata=swarm_metadata,
-# root_agent=advisor,
-# refresh_rate=0.1,
-# )
-
-# # Example of streaming output simulation
-# async def simulate_outputs():
-# await viz.stream_output(
-# advisor,
-# "Analyzing market conditions...\nGenerating investment advice...",
-# )
-# await viz.stream_output(
-# analyst,
-# "Processing financial data...\nIdentifying trends...",
-# )
-# await viz.stream_output(
-# researcher,
-# "Researching competitor movements...\nAnalyzing market share...",
-# )
-
-# # Run the visualization
-# async def main():
-# viz_task = asyncio.create_task(viz.start())
-# await simulate_outputs()
-# await viz_task
-
-# asyncio.run(main())
diff --git a/swarms/utils/xml_utils.py b/swarms/utils/xml_utils.py
index e3ccd308..be2dcfcf 100644
--- a/swarms/utils/xml_utils.py
+++ b/swarms/utils/xml_utils.py
@@ -3,7 +3,22 @@ from typing import Any
def dict_to_xml(tag: str, d: dict) -> ET.Element:
- """Convert a dictionary to an XML Element."""
+ """
+ Convert a dictionary to an XML Element.
+
+ Args:
+ tag (str): The tag name for the root element
+ d (dict): The dictionary to convert to XML
+
+ Returns:
+ ET.Element: An XML Element representing the dictionary structure
+
+ Example:
+ >>> data = {"person": {"name": "John", "age": 30}}
+ >>> elem = dict_to_xml("root", data)
+ >>> ET.tostring(elem, encoding="unicode")
+ 'John30'
+ """
elem = ET.Element(tag)
for key, val in d.items():
child = ET.Element(str(key))
@@ -24,7 +39,27 @@ def dict_to_xml(tag: str, d: dict) -> ET.Element:
def to_xml_string(data: Any, root_tag: str = "root") -> str:
- """Convert a dict or list to an XML string."""
+ """
+ Convert a dict or list to an XML string.
+
+ Args:
+ data (Any): The data to convert to XML. Can be a dictionary, list, or other value
+ root_tag (str, optional): The tag name for the root element. Defaults to "root"
+
+ Returns:
+ str: An XML string representation of the input data
+
+ Example:
+ >>> data = {"person": {"name": "John", "age": 30}}
+ >>> xml_str = to_xml_string(data)
+ >>> print(xml_str)
+ John30
+
+ >>> data = [1, 2, 3]
+ >>> xml_str = to_xml_string(data)
+ >>> print(xml_str)
+ - 1
- 2
- 3
+ """
if isinstance(data, dict):
elem = dict_to_xml(root_tag, data)
elif isinstance(data, list):
diff --git a/v0_model.py b/v0_model.py
deleted file mode 100644
index 5546ffb9..00000000
--- a/v0_model.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# 'v0-1.0-md'
-# https://api.v0.dev/v1/chat/completions
-
-import time
-from swarms import Agent
-import os
-from dotenv import load_dotenv
-
-load_dotenv()
-
-FRONT_END_DEVELOPMENT_PROMPT = """
- You are an expert full-stack development agent with comprehensive expertise in:
-
- Frontend Development:
- - Modern React.js/Next.js architecture and best practices
- - Advanced TypeScript implementation and type safety
- - State-of-the-art UI/UX design patterns
- - Responsive and accessible design principles
- - Component-driven development with Storybook
- - Modern CSS frameworks (Tailwind, Styled-Components)
- - Performance optimization and lazy loading
-
- Backend Development:
- - Scalable microservices architecture
- - RESTful and GraphQL API design
- - Database optimization and schema design
- - Authentication and authorization systems
- - Serverless architecture and cloud services
- - CI/CD pipeline implementation
- - Security best practices and OWASP guidelines
-
- Development Practices:
- - Test-Driven Development (TDD)
- - Clean Code principles
- - Documentation (TSDoc/JSDoc)
- - Git workflow and version control
- - Performance monitoring and optimization
- - Error handling and logging
- - Code review best practices
-
- Your core responsibilities include:
- 1. Developing production-grade TypeScript applications
- 2. Implementing modern, accessible UI components
- 3. Designing scalable backend architectures
- 4. Writing comprehensive documentation
- 5. Ensuring type safety across the stack
- 6. Optimizing application performance
- 7. Implementing security best practices
-
- You maintain strict adherence to:
- - TypeScript strict mode and proper typing
- - SOLID principles and clean architecture
- - Accessibility standards (WCAG 2.1)
- - Performance budgets and metrics
- - Security best practices
- - Comprehensive test coverage
- - Modern design system principles
-"""
-
-# Initialize the agent
-agent = Agent(
- agent_name="Quantitative-Trading-Agent",
- agent_description="Advanced quantitative trading and algorithmic analysis agent",
- system_prompt=FRONT_END_DEVELOPMENT_PROMPT,
- max_loops=1,
- model_name="v0-1.0-md",
- dynamic_temperature_enabled=True,
- output_type="all",
- # safety_prompt_on=True,
- llm_api_key=os.getenv("V0_API_KEY"),
- llm_base_url="https://api.v0.dev/v1/chat/completions",
-)
-
-out = agent.run(
- "Build a simple web app that allows users to upload a file and then download it."
-)
-
-time.sleep(10)
-print(out)