[FIX][AgentLoader] [REMOVE][DeepResearchSwarm]

pull/1043/head
Kye Gomez 2 weeks ago
parent 582edf427f
commit bff7a18047

@ -1,186 +0,0 @@
# Deep Research Swarm
!!! abstract "Overview"
The Deep Research Swarm is a powerful, production-grade research system that conducts comprehensive analysis across multiple domains using parallel processing and advanced AI agents.
Key Features:
- Parallel search processing
- Multi-agent research coordination
- Advanced information synthesis
- Automated query generation
- Concurrent task execution
## Getting Started
!!! tip "Quick Installation"
```bash
pip install swarms
```
=== "Basic Usage"
```python
from swarms.structs import DeepResearchSwarm
# Initialize the swarm
swarm = DeepResearchSwarm(
name="MyResearchSwarm",
output_type="json",
max_loops=1
)
# Run a single research task
results = swarm.run("What are the latest developments in quantum computing?")
```
=== "Batch Processing"
```python
# Run multiple research tasks in parallel
tasks = [
"What are the environmental impacts of electric vehicles?",
"How is AI being used in drug discovery?",
]
batch_results = swarm.batched_run(tasks)
```
## Configuration
!!! info "Constructor Arguments"
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `name` | str | "DeepResearchSwarm" | Name identifier for the swarm |
| `description` | str | "A swarm that conducts..." | Description of the swarm's purpose |
| `research_agent` | Agent | research_agent | Custom research agent instance |
| `max_loops` | int | 1 | Maximum number of research iterations |
| `nice_print` | bool | True | Enable formatted console output |
| `output_type` | str | "json" | Output format ("json" or "string") |
| `max_workers` | int | CPU_COUNT * 2 | Maximum concurrent threads |
| `token_count` | bool | False | Enable token counting |
| `research_model_name` | str | "gpt-4o-mini" | Model to use for research |
## Core Methods
### Run
!!! example "Single Task Execution"
```python
results = swarm.run("What are the latest breakthroughs in fusion energy?")
```
### Batched Run
!!! example "Parallel Task Execution"
```python
tasks = [
"What are current AI safety initiatives?",
"How is CRISPR being used in agriculture?",
]
results = swarm.batched_run(tasks)
```
### Step
!!! example "Single Step Execution"
```python
results = swarm.step("Analyze recent developments in renewable energy storage")
```
## Domain-Specific Examples
=== "Scientific Research"
```python
science_swarm = DeepResearchSwarm(
name="ScienceSwarm",
output_type="json",
max_loops=2 # More iterations for thorough research
)
results = science_swarm.run(
"What are the latest experimental results in quantum entanglement?"
)
```
=== "Market Research"
```python
market_swarm = DeepResearchSwarm(
name="MarketSwarm",
output_type="json"
)
results = market_swarm.run(
"What are the emerging trends in electric vehicle battery technology market?"
)
```
=== "News Analysis"
```python
news_swarm = DeepResearchSwarm(
name="NewsSwarm",
output_type="string" # Human-readable output
)
results = news_swarm.run(
"What are the global economic impacts of recent geopolitical events?"
)
```
=== "Medical Research"
```python
medical_swarm = DeepResearchSwarm(
name="MedicalSwarm",
max_loops=2
)
results = medical_swarm.run(
"What are the latest clinical trials for Alzheimer's treatment?"
)
```
## Advanced Features
??? note "Custom Research Agent"
```python
from swarms import Agent
custom_agent = Agent(
agent_name="SpecializedResearcher",
system_prompt="Your specialized prompt here",
model_name="gpt-4"
)
swarm = DeepResearchSwarm(
research_agent=custom_agent,
max_loops=2
)
```
??? note "Parallel Processing Control"
```python
swarm = DeepResearchSwarm(
max_workers=8, # Limit to 8 concurrent threads
nice_print=False # Disable console output for production
)
```
## Best Practices
!!! success "Recommended Practices"
1. **Query Formulation**: Be specific and clear in your research queries
2. **Resource Management**: Adjust `max_workers` based on your system's capabilities
3. **Output Handling**: Use appropriate `output_type` for your use case
4. **Error Handling**: Implement try-catch blocks around swarm operations
5. **Model Selection**: Choose appropriate models based on research complexity
## Limitations
!!! warning "Known Limitations"
- Requires valid API keys for external services
- Performance depends on system resources
- Rate limits may apply to external API calls
- Token limits apply to model responses

@ -15,7 +15,6 @@ The `SwarmRouter` class is a flexible routing system designed to manage differen
| `HiearchicalSwarm` | Hierarchical organization of agents |
| `MajorityVoting` | Uses majority voting for decision making |
| `MALT` | Multi-Agent Language Tasks |
| `DeepResearchSwarm` | Specialized for deep research tasks |
| `CouncilAsAJudge` | Council-based judgment system |
| `InteractiveGroupChat` | Interactive group chat with user participation |
| `auto` | Automatically selects best swarm type via embedding search |

@ -18,7 +18,7 @@ The AgentLoader enables you to:
The AgentLoader is included with the Swarms framework:
```python
from swarms.utils import AgentLoader, load_agent_from_markdown, load_agents_from_markdown
from swarms import AgentLoader, load_agent_from_markdown, load_agents_from_markdown
```
## Markdown Format
@ -99,7 +99,7 @@ result = workflow.run(task)
For more advanced usage, use the `AgentLoader` class directly:
```python
from swarms.utils import AgentLoader
from swarms import AgentLoader
# Initialize loader
loader = AgentLoader()
@ -209,7 +209,7 @@ response = agent.run(
The AgentLoader provides comprehensive error handling:
```python
from swarms.utils import AgentLoader
from swarms import AgentLoader
loader = AgentLoader()

@ -233,7 +233,6 @@ Available swarm types for different execution patterns.
| `Auto` | Automatically selects the best swarm type |
| `MajorityVoting` | Agents vote on decisions |
| `Malt` | Multi-Agent Language Tasks |
| `DeepResearchSwarm` | Specialized for deep research tasks |
## Detailed Examples

@ -1,10 +0,0 @@
from swarms.structs.deep_research_swarm import DeepResearchSwarm
model = DeepResearchSwarm(
research_model_name="groq/deepseek-r1-distill-qwen-32b"
)
model.run(
"What are the latest research papers on extending telomeres in humans? Give 1 queries for the search not too many`"
)

@ -1,12 +0,0 @@
from swarms.structs.deep_research_swarm import DeepResearchSwarm
swarm = DeepResearchSwarm(
name="Deep Research Swarm",
description="A swarm that conducts comprehensive research across multiple domains",
max_loops=1,
)
swarm.run(
"What are the biggest gas and oil companies in russia? Only provide 3 queries"
)

@ -1,13 +0,0 @@
from swarms.structs.deep_research_swarm import DeepResearchSwarm
def main():
swarm = DeepResearchSwarm(
name="Deep Research Swarm",
description="A swarm of agents that can perform deep research on a given topic",
)
swarm.run("What are the latest news in the AI an crypto space")
main()

@ -1,23 +0,0 @@
from swarms.structs.deep_research_swarm import DeepResearchSwarm
def main():
swarm = DeepResearchSwarm(
name="Deep Research Swarm",
description="A swarm of agents that can perform deep research on a given topic",
output_type="string", # Change to string output type for better readability
)
# Format the query as a proper question
query = "What are the latest developments and news in the AI and cryptocurrency space?"
try:
result = swarm.run(query)
print("\nResearch Results:")
print(result)
except Exception as e:
print(f"Error occurred: {str(e)}")
if __name__ == "__main__":
main()

@ -1,13 +0,0 @@
from swarms.structs.deep_research_swarm import DeepResearchSwarm
swarm = DeepResearchSwarm(
name="Deep Research Swarm",
description="A swarm of agents that can perform deep research on a given topic",
output_type="all", # Change to string output type for better readability
)
out = swarm.run(
"What are the latest developments and news in the AI and cryptocurrency space?"
)
print(out)

@ -20,7 +20,7 @@ from swarms.agents.create_agents_from_yaml import (
)
from swarms.cli.onboarding_process import OnboardingProcess
from swarms.structs.agent import Agent
from swarms.utils.agent_loader import AgentLoader
from swarms.structs.agent_loader import AgentLoader
from swarms.utils.formatter import formatter
from dotenv import load_dotenv

@ -14,7 +14,6 @@ SwarmType = Literal[
"auto",
"MajorityVoting",
"MALT",
"DeepResearchSwarm",
"CouncilAsAJudge",
"InteractiveGroupChat",
]

@ -10,7 +10,6 @@ from swarms.structs.conversation import Conversation
from swarms.structs.council_as_judge import CouncilAsAJudge
from swarms.structs.cron_job import CronJob
from swarms.structs.de_hallucination_swarm import DeHallucinationSwarm
from swarms.structs.deep_research_swarm import DeepResearchSwarm
from swarms.structs.graph_workflow import (
Edge,
GraphWorkflow,
@ -164,7 +163,6 @@ __all__ = [
"AgentsBuilder",
"MALT",
"DeHallucinationSwarm",
"DeepResearchSwarm",
"HybridHierarchicalClusterSwarm",
"get_agents_info",
"get_swarms_info",

@ -0,0 +1,208 @@
import os
from typing import List, Union
from swarms.agents.create_agents_from_yaml import (
ReturnTypes,
create_agents_from_yaml,
)
from swarms.structs.agent import Agent
from swarms.structs.csv_to_agent import AgentLoader as CSVAgentLoader
from swarms.utils.agent_loader_markdown import (
load_agent_from_markdown,
load_agents_from_markdown,
AgentLoader as MarkdownAgentLoader,
)
class AgentLoader:
"""
Loader class for creating Agent objects from various file formats.
This class provides methods to load agents from Markdown, YAML, and CSV files.
"""
def __init__(self):
"""
Initialize the AgentLoader instance.
"""
pass
def load_agents_from_markdown(
self,
file_paths: Union[str, List[str]],
concurrent: bool = True,
max_file_size_mb: float = 10.0,
**kwargs,
) -> List[Agent]:
"""
Load multiple agents from one or more Markdown files.
Args:
file_paths (Union[str, List[str]]): Path or list of paths to Markdown file(s) containing agent definitions.
concurrent (bool, optional): Whether to load files concurrently. Defaults to True.
max_file_size_mb (float, optional): Maximum file size in MB to process. Defaults to 10.0.
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
List[Agent]: A list of loaded Agent objects.
"""
return load_agents_from_markdown(
file_paths=file_paths,
concurrent=concurrent,
max_file_size_mb=max_file_size_mb,
**kwargs,
)
def load_agent_from_markdown(
self, file_path: str, **kwargs
) -> Agent:
"""
Load a single agent from a Markdown file.
Args:
file_path (str): Path to the Markdown file containing the agent definition.
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
Agent: The loaded Agent object.
"""
return load_agent_from_markdown(file_path=file_path, **kwargs)
def load_agents_from_yaml(
self,
yaml_file: str,
return_type: ReturnTypes = "auto",
**kwargs,
) -> List[Agent]:
"""
Load agents from a YAML file.
Args:
yaml_file (str): Path to the YAML file containing agent definitions.
return_type (ReturnTypes, optional): The return type for the loader. Defaults to "auto".
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
List[Agent]: A list of loaded Agent objects.
"""
return create_agents_from_yaml(
yaml_file=yaml_file, return_type=return_type, **kwargs
)
def load_many_agents_from_yaml(
self,
yaml_files: List[str],
return_types: List[ReturnTypes] = ["auto"],
**kwargs,
) -> List[Agent]:
"""
Load agents from multiple YAML files.
Args:
yaml_files (List[str]): List of YAML file paths containing agent definitions.
return_types (List[ReturnTypes], optional): List of return types for each YAML file. Defaults to ["auto"].
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
List[Agent]: A list of loaded Agent objects from all files.
"""
return [
self.load_agents_from_yaml(
yaml_file=yaml_file,
return_type=return_types[i],
**kwargs,
)
for i, yaml_file in enumerate(yaml_files)
]
def load_agents_from_csv(
self, csv_file: str, **kwargs
) -> List[Agent]:
"""
Load agents from a CSV file.
Args:
csv_file (str): Path to the CSV file containing agent definitions.
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
List[Agent]: A list of loaded Agent objects.
"""
loader = CSVAgentLoader(file_path=csv_file)
return loader.load_agents()
def auto(self, file_path: str, *args, **kwargs):
"""
Automatically load agents from a file based on its extension.
Args:
file_path (str): Path to the agent file (Markdown, YAML, or CSV).
*args: Additional positional arguments passed to the underlying loader.
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
List[Agent]: A list of loaded Agent objects.
Raises:
ValueError: If the file type is not supported.
"""
if file_path.endswith(".md"):
return self.load_agents_from_markdown(
file_path, *args, **kwargs
)
elif file_path.endswith(".yaml"):
return self.load_agents_from_yaml(
file_path, *args, **kwargs
)
elif file_path.endswith(".csv"):
return self.load_agents_from_csv(
file_path, *args, **kwargs
)
else:
raise ValueError(f"Unsupported file type: {file_path}")
def load_single_agent(self, *args, **kwargs):
"""
Load a single agent from a file of a supported type.
Args:
*args: Positional arguments passed to the underlying loader.
**kwargs: Keyword arguments passed to the underlying loader.
Returns:
Agent: The loaded Agent object.
"""
return self.auto(*args, **kwargs)
def load_multiple_agents(
self, file_paths: List[str], *args, **kwargs
):
"""
Load multiple agents from a list of files of various supported types.
Args:
file_paths (List[str]): List of file paths to agent files (Markdown, YAML, or CSV).
*args: Additional positional arguments passed to the underlying loader.
**kwargs: Additional keyword arguments passed to the underlying loader.
Returns:
List[Agent]: A list of loaded Agent objects from all files.
"""
return [
self.auto(file_path, *args, **kwargs)
for file_path in file_paths
]
def parse_markdown_file(self, file_path: str):
"""
Parse a Markdown file and return the agents defined within.
Args:
file_path (str): Path to the Markdown file.
Returns:
List[Agent]: A list of Agent objects parsed from the file.
"""
return MarkdownAgentLoader(
max_workers=os.cpu_count()
).parse_markdown_file(file_path=file_path)

@ -85,7 +85,6 @@ Choose the most appropriate architecture based on task requirements:
- **HiearchicalSwarm**: Layered decision-making with management and execution tiers
- **MajorityVoting**: Democratic decision-making with voting mechanisms
- **MALT**: Multi-agent learning and training with knowledge sharing
- **DeepResearchSwarm**: Comprehensive research with multiple specialized investigators
- **CouncilAsAJudge**: Deliberative decision-making with expert panels
- **InteractiveGroupChat**: Dynamic group interactions with real-time collaboration
- **HeavySwarm**: High-capacity processing with multiple specialized agents

@ -52,8 +52,6 @@ board_logger = initialize_logger(
# ============================================================================
class BoardConfigModel(BaseModel):
"""
Configuration model for Board of Directors feature.
@ -471,7 +469,6 @@ def get_board_config(
return _board_config
def create_default_config_file(
file_path: str = "swarms_board_config.yaml",
) -> None:

@ -1,23 +1,25 @@
import concurrent.futures
import csv
import json
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import (
List,
Any,
Dict,
List,
TypedDict,
Any,
Union,
TypeVar,
Union,
)
from dataclasses import dataclass
import csv
import json
import yaml
from pathlib import Path
from enum import Enum
from swarms.structs.agent import Agent
from swarms.schemas.swarms_api_schemas import AgentSpec
from litellm import model_list
import concurrent.futures
from tqdm import tqdm
from swarms.schemas.swarms_api_schemas import AgentSpec
from swarms.structs.agent import Agent
# Type variable for agent configuration
AgentConfigType = TypeVar(
"AgentConfigType", bound=Union[AgentSpec, Dict[str, Any]]

@ -1,479 +0,0 @@
import concurrent.futures
import json
import os
from typing import Any, List
from dotenv import load_dotenv
from rich.console import Console
import requests
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.formatter import formatter
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.str_to_dict import str_to_dict
console = Console()
load_dotenv()
# Number of worker threads for concurrent operations
MAX_WORKERS = (
os.cpu_count() * 2
) # Optimal number of workers based on CPU cores
def exa_search(query: str, **kwargs: Any) -> str:
"""Performs web search using Exa.ai API and returns formatted results."""
api_url = "https://api.exa.ai/search"
api_key = os.getenv("EXA_API_KEY")
if not api_key:
return "### Error\nEXA_API_KEY environment variable not set\n"
headers = {
"x-api-key": api_key,
"Content-Type": "application/json",
}
safe_kwargs = {
str(k): v
for k, v in kwargs.items()
if k is not None and v is not None and str(k) != "None"
}
payload = {
"query": query,
"useAutoprompt": True,
"numResults": safe_kwargs.get("num_results", 10),
"contents": {
"text": True,
"highlights": {"numSentences": 10},
},
}
for key, value in safe_kwargs.items():
if key not in payload and key not in [
"query",
"useAutoprompt",
"numResults",
"contents",
]:
payload[key] = value
try:
response = requests.post(
api_url, json=payload, headers=headers
)
if response.status_code != 200:
return f"### Error\nHTTP {response.status_code}: {response.text}\n"
json_data = response.json()
except Exception as e:
return f"### Error\n{str(e)}\n"
if "error" in json_data:
return f"### Error\n{json_data['error']}\n"
formatted_text = []
search_params = json_data.get("effectiveFilters", {})
query = search_params.get("query", "General web search")
formatted_text.append(
f"### Exa Search Results for: '{query}'\n\n---\n"
)
results = json_data.get("results", [])
if not results:
formatted_text.append("No results found.\n")
return "".join(formatted_text)
for i, result in enumerate(results, 1):
title = result.get("title", "No title")
url = result.get("url", result.get("id", "No URL"))
published_date = result.get("publishedDate", "")
highlights = result.get("highlights", [])
highlight_text = (
"\n".join(
(
h.get("text", str(h))
if isinstance(h, dict)
else str(h)
)
for h in highlights[:3]
)
if highlights
else "No summary available"
)
formatted_text.extend(
[
f"{i}. **{title}**\n",
f" - URL: {url}\n",
f" - Published: {published_date.split('T')[0] if published_date else 'Date unknown'}\n",
f" - Key Points:\n {highlight_text}\n\n",
]
)
return "".join(formatted_text)
# Define the research tools schema
tools = [
{
"type": "function",
"function": {
"name": "search_topic",
"description": "Conduct a thorough search on a specified topic or subtopic, generating a precise array of highly detailed search queries tailored to the input parameters.",
"parameters": {
"type": "object",
"properties": {
"depth": {
"type": "integer",
"description": "Indicates the level of thoroughness for the search. Values range from 1 to 3, where 1 signifies a superficial search and 3 indicates an in-depth exploration of the topic.",
},
"detailed_queries": {
"type": "array",
"description": "An array of specific search queries generated based on the input query and the specified depth. Each query must be crafted to elicit detailed and relevant information from various sources.",
"items": {
"type": "string",
"description": "Each item in this array must represent a unique search query targeting a specific aspect of the main topic, ensuring a comprehensive exploration of the subject matter.",
},
},
},
"required": ["depth", "detailed_queries"],
},
},
},
]
RESEARCH_AGENT_PROMPT = """
You are an advanced research agent specialized in conducting deep, comprehensive research across multiple domains.
Your task is to:
1. Break down complex topics into searchable subtopics
2. Generate diverse search queries to explore each subtopic thoroughly
3. Identify connections and patterns across different areas of research
4. Synthesize findings into coherent insights
5. Identify gaps in current knowledge and suggest areas for further investigation
For each research task:
- Consider multiple perspectives and approaches
- Look for both supporting and contradicting evidence
- Evaluate the credibility and relevance of sources
- Track emerging trends and recent developments
- Consider cross-disciplinary implications
Output Format:
- Provide structured research plans
- Include specific search queries for each subtopic
- Prioritize queries based on relevance and potential impact
- Suggest follow-up areas for deeper investigation
"""
SUMMARIZATION_AGENT_PROMPT = """
You are an expert information synthesis and summarization agent designed for producing clear, accurate, and insightful summaries of complex information. Your core capabilities include:
Core Capabilities:
- Identify and extract key concepts, themes, and insights from any given content
- Recognize patterns, relationships, and hierarchies within information
- Filter out noise while preserving crucial context and nuance
- Handle multiple sources and perspectives simultaneously
Summarization Strategy
1. Multi-level Structure
- Provide an extensive summary
- Follow with key findings
- Include detailed insights with supporting evidence
- End with implications or next steps when relevant
2. Quality Standards
- Maintain factual accuracy and precision
- Preserve important technical details and terminology
- Avoid oversimplification of complex concepts
- Include quantitative data when available
- Cite or reference specific sources when summarizing claims
3. Clarity & Accessibility
- Use clear, concise language
- Define technical terms when necessary
- Structure information logically
- Use formatting to enhance readability
- Maintain appropriate level of technical depth for the audience
4. Synthesis & Analysis
- Identify conflicting information or viewpoints
- Highlight consensus across sources
- Note gaps or limitations in the information
- Draw connections between related concepts
- Provide context for better understanding
OUTPUT REQUIREMENTS:
- Begin with a clear statement of the topic or question being addressed
- Use consistent formatting and structure
- Clearly separate different levels of detail
- Include confidence levels for conclusions when appropriate
- Note any areas requiring additional research or clarification
Remember: Your goal is to make complex information accessible while maintaining accuracy and depth. Prioritize clarity without sacrificing important nuance or detail."""
class DeepResearchSwarm:
def __init__(
self,
name: str = "DeepResearchSwarm",
description: str = "A swarm that conducts comprehensive research across multiple domains",
max_loops: int = 1,
nice_print: bool = True,
output_type: str = "json",
max_workers: int = os.cpu_count()
* 2, # Let the system decide optimal thread count
token_count: bool = False,
research_model_name: str = "gpt-4o-mini",
claude_summarization_model_name: str = "claude-3-5-sonnet-20240620",
):
self.name = name
self.description = description
self.max_loops = max_loops
self.nice_print = nice_print
self.output_type = output_type
self.max_workers = max_workers
self.research_model_name = research_model_name
self.claude_summarization_model_name = (
claude_summarization_model_name
)
self.reliability_check()
self.conversation = Conversation(token_count=token_count)
# Create a persistent ThreadPoolExecutor for the lifetime of the swarm
# This eliminates thread creation overhead on each query
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
)
# Initialize the research agent
self.research_agent = Agent(
agent_name="Deep-Research-Agent",
agent_description="Specialized agent for conducting comprehensive research across multiple domains",
system_prompt=RESEARCH_AGENT_PROMPT,
max_loops=1, # Allow multiple iterations for thorough research
tools_list_dictionary=tools,
model_name=self.research_model_name,
output_type="final",
)
self.summarization_agent = Agent(
agent_name="Summarization-Agent",
agent_description="Specialized agent for summarizing research results",
system_prompt=SUMMARIZATION_AGENT_PROMPT,
max_loops=1,
model_name=self.claude_summarization_model_name,
output_type="final",
)
def __del__(self):
"""Clean up the executor on object destruction"""
self.executor.shutdown(wait=False)
def reliability_check(self):
"""Check the reliability of the query"""
if self.max_loops < 1:
raise ValueError("max_loops must be greater than 0")
formatter.print_panel(
"DeepResearchSwarm is booting up...", "blue"
)
formatter.print_panel("Reliability check passed", "green")
def get_queries(self, query: str) -> List[str]:
"""
Generate a list of detailed search queries based on the input query.
Args:
query (str): The main research query to explore
Returns:
List[str]: A list of detailed search queries
"""
self.conversation.add(role="User", content=query)
# Get the agent's response
agent_output = self.research_agent.run(query)
# Transform the string into a list of dictionaries
agent_output = json.loads(agent_output)
print(agent_output)
print(type(agent_output))
formatter.print_panel(
f"Agent output type: {type(agent_output)} \n {agent_output}",
"blue",
)
# Convert the output to a dictionary if it's a list
if isinstance(agent_output, list):
agent_output = json.dumps(agent_output)
if isinstance(agent_output, str):
# Convert the string output to dictionary
output_dict = (
str_to_dict(agent_output)
if isinstance(agent_output, str)
else agent_output
)
# Extract the detailed queries from the output
# Search for the key "detailed_queries" in the output list[dictionary]
if isinstance(output_dict, list):
for item in output_dict:
if "detailed_queries" in item:
queries = item["detailed_queries"]
break
else:
queries = output_dict.get("detailed_queries", [])
print(queries)
# Log the number of queries generated
formatter.print_panel(
f"Generated {len(queries)} queries", "blue"
)
print(queries)
print(type(queries))
return queries
def step(self, query: str):
"""
Execute a single research step with maximum parallelism.
Args:
query (str): The research query to process
Returns:
Formatted conversation history
"""
try:
# Get all the queries to process
queries = self.get_queries(query)
print(queries)
# Submit all queries for concurrent processing
futures = []
for q in queries:
future = self.executor.submit(exa_search, q)
futures.append((q, future))
# Process results as they complete
for q, future in futures:
try:
# Get search results only
results = future.result()
# Add search results to conversation
self.conversation.add(
role="User",
content=f"Search results for {q}: \n {results}",
)
except Exception as e:
# Handle any errors in the thread
error_msg = (
f"Error processing query '{q}': {str(e)}"
)
console.print(f"[bold red]{error_msg}[/bold red]")
self.conversation.add(
role="System",
content=error_msg,
)
# Generate final comprehensive analysis after all searches are complete
try:
final_summary = self.summarization_agent.run(
f"Please generate a comprehensive 4,000-word report analyzing the following content: {self.conversation.get_str()}"
)
self.conversation.add(
role=self.summarization_agent.agent_name,
content=final_summary,
)
except Exception as e:
error_msg = (
f"Error generating final summary: {str(e)}"
)
console.print(f"[bold red]{error_msg}[/bold red]")
self.conversation.add(
role="System",
content=error_msg,
)
# Return formatted output
result = history_output_formatter(
self.conversation, type=self.output_type
)
# If output type is JSON, ensure it's properly formatted
if self.output_type.lower() == "json":
try:
import json
if isinstance(result, str):
# Try to parse and reformat for pretty printing
parsed = json.loads(result)
return json.dumps(
parsed, indent=2, ensure_ascii=False
)
except (json.JSONDecodeError, TypeError):
# If parsing fails, return as-is
pass
return result
except Exception as e:
error_msg = f"Critical error in step execution: {str(e)}"
console.print(f"[bold red]{error_msg}[/bold red]")
return (
{"error": error_msg}
if self.output_type.lower() == "json"
else error_msg
)
def run(self, task: str):
return self.step(task)
def batched_run(self, tasks: List[str]):
"""
Execute a list of research tasks in parallel.
Args:
tasks (List[str]): A list of research tasks to execute
Returns:
List[str]: A list of formatted conversation histories
"""
futures = []
for task in tasks:
future = self.executor.submit(self.step, task)
futures.append((task, future))
# Example usage
# if __name__ == "__main__":
# try:
# swarm = DeepResearchSwarm(
# output_type="json",
# )
# result = swarm.step(
# "What is the active tariff situation with mexico? Only create 2 queries"
# )
# # Parse and display results in rich format with markdown export
# swarm.parse_and_display_results(result, export_markdown=True)
# except Exception as e:
# print(f"Error running deep research swarm: {str(e)}")
# import traceback
# traceback.print_exc()

@ -1550,7 +1550,7 @@ Always explain your refinements and how they address the evaluation feedback.
eval_message = f"Evaluate the following content based on {criterion} criterion"
eval_background = f"Evaluation criterion: {criterion}\nContent to evaluate: {content}"
structured_msg = self.send_structured_message(
self.send_structured_message(
sender=self.evaluation_supervisor_name,
recipient=(
evaluator.agent_name
@ -1684,7 +1684,7 @@ Always explain your refinements and how they address the evaluation feedback.
message = f"Generate content for the following task: {task}"
background = f"Task context: {task}\n\nProvide comprehensive, well-structured content."
structured_msg = self.send_structured_message(
self.send_structured_message(
sender=self.supervisor_name,
recipient=(
generator.agent_name
@ -1755,7 +1755,7 @@ Always explain your refinements and how they address the evaluation feedback.
)
background = f"Original content: {original_content}\n\nEvaluation feedback:\n{feedback_summary}"
structured_msg = self.send_structured_message(
self.send_structured_message(
sender=self.supervisor_name,
recipient=(
refiner.agent_name

@ -621,10 +621,6 @@ class SwarmMatcher:
name="ConsensusSwarm",
description="Achieve group decisions through consensus mechanisms and voting protocols among multiple agents. Keywords: group decision making, voting systems, collective intelligence, agreement protocols, democratic processes, collaborative decisions, consensus building",
),
SwarmType(
name="DeepResearchSwarm",
description="Conduct in-depth research and analysis by coordinating multiple agents to explore, synthesize, and validate information from various sources. Keywords: research methodology, information synthesis, data validation, comprehensive analysis, knowledge discovery, systematic investigation",
),
SwarmType(
name="CouncilAsAJudge",
description="Evaluate and judge solutions or decisions through a council of expert agents acting as arbitrators. Keywords: evaluation, judgment, arbitration, expert assessment, quality control, decision validation, peer review, consensus building",

@ -10,10 +10,9 @@ from swarms.prompts.multi_agent_collab_prompt import (
MULTI_AGENT_COLLAB_PROMPT_TWO,
)
from swarms.structs.agent import Agent
from swarms.structs.agent_rearrange import AgentRearrange
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.council_as_judge import CouncilAsAJudge
from swarms.structs.csv_to_agent import AgentLoader
from swarms.structs.deep_research_swarm import DeepResearchSwarm
from swarms.structs.groupchat import GroupChat
from swarms.structs.heavy_swarm import HeavySwarm
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
@ -23,7 +22,6 @@ from swarms.structs.majority_voting import MajorityVoting
from swarms.structs.malt import MALT
from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.structs.multi_agent_router import MultiAgentRouter
from swarms.structs.agent_rearrange import AgentRearrange
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.swarm_matcher import swarm_matcher
from swarms.telemetry.log_executions import log_execution
@ -45,7 +43,6 @@ SwarmType = Literal[
"auto",
"MajorityVoting",
"MALT",
"DeepResearchSwarm",
"CouncilAsAJudge",
"InteractiveGroupChat",
"HeavySwarm",
@ -288,12 +285,6 @@ class SwarmRouter:
self.setup()
# Load agents from CSV
if self.load_agents_from_csv:
self.agents = AgentLoader(
csv_path=self.csv_file_path
).load_agents()
if self.telemetry_enabled:
self.agent_config = self.agent_config()
@ -387,7 +378,6 @@ class SwarmRouter:
"MALT": self._create_malt,
"CouncilAsAJudge": self._create_council_as_judge,
"InteractiveGroupChat": self._create_interactive_group_chat,
"DeepResearchSwarm": self._create_deep_research_swarm,
"HiearchicalSwarm": self._create_hierarchical_swarm,
"MixtureOfAgents": self._create_mixture_of_agents,
"MajorityVoting": self._create_majority_voting,
@ -455,16 +445,6 @@ class SwarmRouter:
speaker_function=self.speaker_function,
)
def _create_deep_research_swarm(self, *args, **kwargs):
"""Factory function for DeepResearchSwarm."""
return DeepResearchSwarm(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
output_type=self.output_type,
)
def _create_hierarchical_swarm(self, *args, **kwargs):
"""Factory function for HierarchicalSwarm."""
return HierarchicalSwarm(

@ -21,9 +21,7 @@ from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.agent_loader import (
AgentLoader,
MarkdownAgentConfig,
from swarms.utils.agent_loader_markdown import (
load_agent_from_markdown,
load_agents_from_markdown,
)
@ -51,8 +49,6 @@ __all__ = [
"HistoryOutputType",
"history_output_formatter",
"check_all_model_max_tokens",
"AgentLoader",
"MarkdownAgentConfig",
"load_agent_from_markdown",
"load_agents_from_markdown",
"dynamic_auto_chunking",

@ -1,14 +1,15 @@
import os
import yaml
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING
from concurrent.futures import (
ThreadPoolExecutor,
as_completed,
TimeoutError,
as_completed,
)
from pydantic import BaseModel, Field, field_validator
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import yaml
from loguru import logger
from pydantic import BaseModel, Field, field_validator
# Type checking imports to avoid circular dependency
if TYPE_CHECKING:
@ -407,14 +408,39 @@ class AgentLoader:
# Convenience functions
def load_agent_from_markdown(file_path: str, **kwargs) -> "Agent":
"""
Load a single agent from a markdown file with Claude Code YAML frontmatter format.
Load a single agent from a markdown file using the Claude Code YAML frontmatter format.
This function provides a simple interface for loading an agent configuration
from a markdown file. It supports all configuration overrides accepted by the
underlying `AgentLoader` and agent class.
Args:
file_path: Path to markdown file with YAML frontmatter
**kwargs: Additional configuration overrides
file_path (str): Path to the markdown file containing YAML frontmatter
with agent configuration.
**kwargs: Optional keyword arguments to override agent configuration
parameters. Common options include:
- max_loops (int): Maximum number of reasoning loops.
- autosave (bool): Enable automatic state saving.
- dashboard (bool): Enable dashboard monitoring.
- verbose (bool): Enable verbose logging.
- dynamic_temperature_enabled (bool): Enable dynamic temperature.
- saved_state_path (str): Path for saving agent state.
- user_name (str): User identifier.
- retry_attempts (int): Number of retry attempts.
- context_length (int): Maximum context length.
- return_step_meta (bool): Return step metadata.
- output_type (str): Output format type.
- auto_generate_prompt (bool): Auto-generate prompts.
- artifacts_on (bool): Enable artifacts.
- streaming_on (bool): Enable streaming output.
- mcp_url (str): MCP server URL if needed.
Returns:
Configured Agent instance
Agent: Configured Agent instance loaded from the markdown file.
Example:
>>> agent = load_agent_from_markdown("finance_advisor.md", max_loops=3, verbose=True)
>>> response = agent.run("What is the best investment strategy for 2024?")
"""
# Lazy import to avoid circular dependency
@ -429,16 +455,36 @@ def load_agents_from_markdown(
**kwargs,
) -> List["Agent"]:
"""
Load multiple agents from markdown files with Claude Code YAML frontmatter format.
Load multiple agents from markdown files using the Claude Code YAML frontmatter format.
This function supports loading agents from a list of markdown files or from all
markdown files in a directory. It can process files concurrently for faster loading,
and allows configuration overrides for all loaded agents.
Args:
file_paths: Directory path or list of file paths with YAML frontmatter
concurrent: Whether to use concurrent processing for multiple files
max_file_size_mb: Maximum file size in MB to prevent memory issues
**kwargs: Additional configuration overrides
file_paths (Union[str, List[str]]): Either a directory path containing markdown
files or a list of markdown file paths to load.
concurrent (bool, optional): If True, enables concurrent processing for faster
loading of multiple files. Defaults to True.
max_file_size_mb (float, optional): Maximum file size (in MB) for each markdown
file to prevent memory issues. Files exceeding this size will be skipped.
Defaults to 10.0.
**kwargs: Optional keyword arguments to override agent configuration
parameters for all loaded agents. See `load_agent_from_markdown` for
available options.
Returns:
List of configured Agent instances
List[Agent]: List of configured Agent instances loaded from the markdown files.
Example:
>>> agents = load_agents_from_markdown(
... ["agent1.md", "agent2.md"],
... concurrent=True,
... max_loops=2,
... verbose=True
... )
>>> for agent in agents:
... print(agent.name)
"""
# Lazy import to avoid circular dependency
Loading…
Cancel
Save