Merge pull request #873 from harshalmore31/Fix/deep_research_swarm

Fixed #865 DeepResearchSwarm
pull/875/head
Kye Gomez 3 weeks ago committed by GitHub
commit 86facfd32a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,12 +1,18 @@
import asyncio import asyncio
import concurrent.futures import concurrent.futures
import json
import os import os
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple
import aiohttp import aiohttp
from dotenv import load_dotenv from dotenv import load_dotenv
from rich.console import Console from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.tree import Tree
from swarms.agents.reasoning_duo import ReasoningDuo from swarms.agents.reasoning_duo import ReasoningDuo
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
@ -116,21 +122,37 @@ async def _async_exa_search(
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Asynchronous helper function for Exa.ai API requests""" """Asynchronous helper function for Exa.ai API requests"""
api_url = "https://api.exa.ai/search" api_url = "https://api.exa.ai/search"
# Check if API key is available
api_key = os.getenv("EXA_API_KEY")
if not api_key:
return {"error": "EXA_API_KEY environment variable not set"}
headers = { headers = {
"x-api-key": os.getenv("EXA_API_KEY"), "x-api-key": api_key,
"Content-Type": "application/json", "Content-Type": "application/json",
} }
# Filter out None keys AND None values from kwargs
safe_kwargs = {
str(k): v for k, v in kwargs.items()
if k is not None and v is not None and str(k) != "None"
}
payload = { payload = {
"query": query, "query": query,
"useAutoprompt": True, "useAutoprompt": True,
"numResults": kwargs.get("num_results", 10), "numResults": safe_kwargs.get("num_results", 10),
"contents": { "contents": {
"text": True, "text": True,
"highlights": {"numSentences": 2}, "highlights": {"numSentences": 2},
}, },
**kwargs,
} }
# Only add safe_kwargs if they don't conflict with existing keys
for key, value in safe_kwargs.items():
if key not in payload and key not in ["query", "useAutoprompt", "numResults", "contents"]:
payload[key] = value
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
@ -370,24 +392,20 @@ class DeepResearchSwarm:
return [] return []
def _process_query(self, query: str) -> Tuple[str, str]: def _process_query(self, query: str) -> str:
""" """
Process a single query with search and reasoning. Process a single query with search only.
This function is designed to be run in a separate thread. This function is designed to be run in a separate thread.
Args: Args:
query (str): The query to process query (str): The query to process
Returns: Returns:
Tuple[str, str]: A tuple containing (search_results, reasoning_output) str: Search results
""" """
# Run the search # Run the search only - no individual reasoning to avoid duplication
results = exa_search(query) results = exa_search(query)
return results
# Run the reasoning on the search results
reasoning_output = self.reasoning_duo.run(results)
return (results, reasoning_output)
def step(self, query: str): def step(self, query: str):
""" """
@ -399,54 +417,85 @@ class DeepResearchSwarm:
Returns: Returns:
Formatted conversation history Formatted conversation history
""" """
# Get all the queries to process try:
queries = self.get_queries(query) # Get all the queries to process
queries = self.get_queries(query)
# Submit all queries for concurrent processing
# Using a list instead of generator for clearer debugging if not queries:
futures = [] error_msg = "No queries generated. Please check your input."
for q in queries: self.conversation.add(role="System", content=error_msg)
future = self.executor.submit(self._process_query, q) return history_output_formatter(
futures.append((q, future)) self.conversation, type=self.output_type
)
# Process results as they complete (no waiting for slower queries) # Submit all queries for concurrent processing
for q, future in futures: futures = []
for q in queries:
future = self.executor.submit(self._process_query, q)
futures.append((q, future))
# Process results as they complete
for q, future in futures:
try:
# Get search results only
results = future.result()
# Add search results to conversation
self.conversation.add(
role="User",
content=f"Search results for {q}: \n {results}",
)
except Exception as e:
# Handle any errors in the thread
error_msg = f"Error processing query '{q}': {str(e)}"
console.print(f"[bold red]{error_msg}[/bold red]")
self.conversation.add(
role="System",
content=error_msg,
)
# Generate final comprehensive analysis after all searches are complete
try: try:
# Get results (blocks until this specific future is done) final_summary = self.reasoning_duo.run(
results, reasoning_output = future.result() f"Generate an extensive report of the following content: {self.conversation.get_str()}"
# Add search results to conversation
self.conversation.add(
role="User",
content=f"Search results for {q}: \n {results}",
) )
# Add reasoning output to conversation
self.conversation.add( self.conversation.add(
role=self.reasoning_duo.agent_name, role=self.reasoning_duo.agent_name,
content=reasoning_output, content=final_summary,
) )
except Exception as e: except Exception as e:
# Handle any errors in the thread error_msg = f"Error generating final summary: {str(e)}"
console.print(f"[bold red]{error_msg}[/bold red]")
self.conversation.add( self.conversation.add(
role="System", role="System",
content=f"Error processing query '{q}': {str(e)}", content=error_msg,
) )
# Once all query processing is complete, generate the final summary # Return formatted output
# This step runs after all queries to ensure it summarizes all results result = history_output_formatter(
final_summary = self.reasoning_duo.run( self.conversation, type=self.output_type
f"Generate an extensive report of the following content: {self.conversation.get_str()}" )
)
# If output type is JSON, ensure it's properly formatted
self.conversation.add( if self.output_type.lower() == "json":
role=self.reasoning_duo.agent_name, try:
content=final_summary, import json
) if isinstance(result, str):
# Try to parse and reformat for pretty printing
return history_output_formatter( parsed = json.loads(result)
self.conversation, type=self.output_type return json.dumps(parsed, indent=2, ensure_ascii=False)
) except (json.JSONDecodeError, TypeError):
# If parsing fails, return as-is
pass
return result
except Exception as e:
error_msg = f"Critical error in step execution: {str(e)}"
console.print(f"[bold red]{error_msg}[/bold red]")
return {"error": error_msg} if self.output_type.lower() == "json" else error_msg
def run(self, task: str): def run(self, task: str):
return self.step(task) return self.step(task)
@ -466,14 +515,113 @@ class DeepResearchSwarm:
future = self.executor.submit(self.step, task) future = self.executor.submit(self.step, task)
futures.append((task, future)) futures.append((task, future))
def parse_and_display_results(self, json_result: str, export_markdown: bool = True):
"""
Parse JSON results and display in rich format with optional markdown export.
Args:
json_result (str): JSON string containing conversation results
export_markdown (bool): Whether to export to markdown file
"""
try:
# Parse JSON
data = json.loads(json_result)
# Create rich display
console.print("\n" + "="*100, style="cyan")
console.print("🔬 DEEP RESEARCH RESULTS", style="bold cyan", justify="center")
console.print("="*100, style="cyan")
# Create conversation tree
tree = Tree("🗣️ Research Conversation", style="bold blue")
markdown_content = ["# Deep Research Results\n", f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"]
for i, entry in enumerate(data, 1):
if isinstance(entry, dict):
role = entry.get('role', 'Unknown')
content = entry.get('content', '')
timestamp = entry.get('timestamp', '')
# Get role info for display
role_info = self._get_role_display_info(role)
# Create tree branch
branch_text = f"{role_info['emoji']} {role}"
if timestamp:
time_part = timestamp.split()[-1] if ' ' in timestamp else timestamp[-8:]
branch_text += f" ({time_part})"
branch = tree.add(branch_text, style=role_info['style'])
# Add content preview to tree
content_preview = content[:150] + "..." if len(content) > 150 else content
content_preview = content_preview.replace('\n', ' ')
branch.add(content_preview, style="dim")
# Add to markdown
markdown_content.append(f"\n## {i}. {role}")
if timestamp:
markdown_content.append(f"**Timestamp:** {timestamp}")
markdown_content.append(f"\n{content}\n")
# Display full content for important entries
if role.lower() in ['reasoning-agent-01'] and len(content) > 300:
console.print(f"\n📋 {role} Full Response:", style="bold green")
console.print(Panel(content, border_style="green", title=f"{role} Analysis"))
# Display the tree
console.print(tree)
# Export to markdown if requested
if export_markdown:
# Create deepsearch_results directory
results_dir = Path("deepsearch_results")
results_dir.mkdir(exist_ok=True)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = results_dir / f"research_results_{timestamp}.md"
# Write markdown file
with open(filename, 'w', encoding='utf-8') as f:
f.write('\n'.join(markdown_content))
console.print(f"\n💾 Results exported to: {filename}", style="bold green")
console.print("\n✅ Research analysis complete!", style="bold cyan")
except json.JSONDecodeError as e:
console.print(f"❌ Error parsing JSON: {e}", style="red")
except Exception as e:
console.print(f"❌ Error displaying results: {e}", style="red")
def _get_role_display_info(self, role: str) -> Dict[str, str]:
"""Get display information for different conversation roles."""
role_map = {
"user": {"emoji": "👤", "style": "cyan"},
"deep-research-agent": {"emoji": "🔍", "style": "blue"},
"reasoning-agent-01": {"emoji": "🧠", "style": "magenta"},
"system": {"emoji": "⚙️", "style": "yellow"},
}
role_lower = role.lower()
return role_map.get(role_lower, {"emoji": "🤖", "style": "white"})
# # Example usage # Example usage
# if __name__ == "__main__": # if __name__ == "__main__":
# swarm = DeepResearchSwarm( # try:
# output_type="json", # swarm = DeepResearchSwarm(
# ) # output_type="json",
# print( # )
# swarm.step( # result = swarm.step(
# "What is the active tarrif situation with mexico? Only create 2 queries" # "What is the active tariff situation with mexico? Only create 2 queries"
# ) # )
# )
# # Parse and display results in rich format with markdown export
# swarm.parse_and_display_results(result, export_markdown=True)
# except Exception as e:
# print(f"Error running deep research swarm: {str(e)}")
# import traceback
# traceback.print_exc()

Loading…
Cancel
Save