[FEAT][DeepResearchSwarm] Enhance API search functionality and add result parsing/displaying features

pull/873/head
harshalmore31 3 weeks ago
parent 156f98a2c2
commit bf26ade931

@ -1,12 +1,18 @@
import asyncio
import concurrent.futures
import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Tuple
import aiohttp
from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.tree import Tree
from swarms.agents.reasoning_duo import ReasoningDuo
from swarms.structs.agent import Agent
@ -116,21 +122,37 @@ async def _async_exa_search(
) -> Dict[str, Any]:
"""Asynchronous helper function for Exa.ai API requests"""
api_url = "https://api.exa.ai/search"
# Check if API key is available
api_key = os.getenv("EXA_API_KEY")
if not api_key:
return {"error": "EXA_API_KEY environment variable not set"}
headers = {
"x-api-key": os.getenv("EXA_API_KEY"),
"x-api-key": api_key,
"Content-Type": "application/json",
}
# Filter out None keys AND None values from kwargs
safe_kwargs = {
str(k): v for k, v in kwargs.items()
if k is not None and v is not None and str(k) != "None"
}
payload = {
"query": query,
"useAutoprompt": True,
"numResults": kwargs.get("num_results", 10),
"numResults": safe_kwargs.get("num_results", 10),
"contents": {
"text": True,
"highlights": {"numSentences": 2},
},
**kwargs,
}
# Only add safe_kwargs if they don't conflict with existing keys
for key, value in safe_kwargs.items():
if key not in payload and key not in ["query", "useAutoprompt", "numResults", "contents"]:
payload[key] = value
try:
async with aiohttp.ClientSession() as session:
@ -370,24 +392,20 @@ class DeepResearchSwarm:
return []
def _process_query(self, query: str) -> Tuple[str, str]:
def _process_query(self, query: str) -> str:
"""
Process a single query with search and reasoning.
Process a single query with search only.
This function is designed to be run in a separate thread.
Args:
query (str): The query to process
Returns:
Tuple[str, str]: A tuple containing (search_results, reasoning_output)
str: Search results
"""
# Run the search
# Run the search only - no individual reasoning to avoid duplication
results = exa_search(query)
# Run the reasoning on the search results
reasoning_output = self.reasoning_duo.run(results)
return (results, reasoning_output)
return results
def step(self, query: str):
"""
@ -399,54 +417,85 @@ class DeepResearchSwarm:
Returns:
Formatted conversation history
"""
# Get all the queries to process
queries = self.get_queries(query)
# Submit all queries for concurrent processing
# Using a list instead of generator for clearer debugging
futures = []
for q in queries:
future = self.executor.submit(self._process_query, q)
futures.append((q, future))
try:
# Get all the queries to process
queries = self.get_queries(query)
if not queries:
error_msg = "No queries generated. Please check your input."
self.conversation.add(role="System", content=error_msg)
return history_output_formatter(
self.conversation, type=self.output_type
)
# Process results as they complete (no waiting for slower queries)
for q, future in futures:
# Submit all queries for concurrent processing
futures = []
for q in queries:
future = self.executor.submit(self._process_query, q)
futures.append((q, future))
# Process results as they complete
for q, future in futures:
try:
# Get search results only
results = future.result()
# Add search results to conversation
self.conversation.add(
role="User",
content=f"Search results for {q}: \n {results}",
)
except Exception as e:
# Handle any errors in the thread
error_msg = f"Error processing query '{q}': {str(e)}"
console.print(f"[bold red]{error_msg}[/bold red]")
self.conversation.add(
role="System",
content=error_msg,
)
# Generate final comprehensive analysis after all searches are complete
try:
# Get results (blocks until this specific future is done)
results, reasoning_output = future.result()
# Add search results to conversation
self.conversation.add(
role="User",
content=f"Search results for {q}: \n {results}",
final_summary = self.reasoning_duo.run(
f"Generate an extensive report of the following content: {self.conversation.get_str()}"
)
# Add reasoning output to conversation
self.conversation.add(
role=self.reasoning_duo.agent_name,
content=reasoning_output,
content=final_summary,
)
except Exception as e:
# Handle any errors in the thread
error_msg = f"Error generating final summary: {str(e)}"
console.print(f"[bold red]{error_msg}[/bold red]")
self.conversation.add(
role="System",
content=f"Error processing query '{q}': {str(e)}",
content=error_msg,
)
# Once all query processing is complete, generate the final summary
# This step runs after all queries to ensure it summarizes all results
final_summary = self.reasoning_duo.run(
f"Generate an extensive report of the following content: {self.conversation.get_str()}"
)
self.conversation.add(
role=self.reasoning_duo.agent_name,
content=final_summary,
)
return history_output_formatter(
self.conversation, type=self.output_type
)
# Return formatted output
result = history_output_formatter(
self.conversation, type=self.output_type
)
# If output type is JSON, ensure it's properly formatted
if self.output_type.lower() == "json":
try:
import json
if isinstance(result, str):
# Try to parse and reformat for pretty printing
parsed = json.loads(result)
return json.dumps(parsed, indent=2, ensure_ascii=False)
except (json.JSONDecodeError, TypeError):
# If parsing fails, return as-is
pass
return result
except Exception as e:
error_msg = f"Critical error in step execution: {str(e)}"
console.print(f"[bold red]{error_msg}[/bold red]")
return {"error": error_msg} if self.output_type.lower() == "json" else error_msg
def run(self, task: str):
return self.step(task)
@ -466,14 +515,114 @@ class DeepResearchSwarm:
future = self.executor.submit(self.step, task)
futures.append((task, future))
def parse_and_display_results(self, json_result: str, export_markdown: bool = True):
"""
Parse JSON results and display in rich format with optional markdown export.
Args:
json_result (str): JSON string containing conversation results
export_markdown (bool): Whether to export to markdown file
"""
try:
# Parse JSON
data = json.loads(json_result)
# Create rich display
console.print("\n" + "="*100, style="cyan")
console.print("🔬 DEEP RESEARCH RESULTS", style="bold cyan", justify="center")
console.print("="*100, style="cyan")
# Create conversation tree
tree = Tree("🗣️ Research Conversation", style="bold blue")
markdown_content = ["# Deep Research Results\n", f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"]
for i, entry in enumerate(data, 1):
if isinstance(entry, dict):
role = entry.get('role', 'Unknown')
content = entry.get('content', '')
timestamp = entry.get('timestamp', '')
# Get role info for display
role_info = self._get_role_display_info(role)
# Create tree branch
branch_text = f"{role_info['emoji']} {role}"
if timestamp:
time_part = timestamp.split()[-1] if ' ' in timestamp else timestamp[-8:]
branch_text += f" ({time_part})"
branch = tree.add(branch_text, style=role_info['style'])
# Add content preview to tree
content_preview = content[:150] + "..." if len(content) > 150 else content
content_preview = content_preview.replace('\n', ' ')
branch.add(content_preview, style="dim")
# Add to markdown
markdown_content.append(f"\n## {i}. {role}")
if timestamp:
markdown_content.append(f"**Timestamp:** {timestamp}")
markdown_content.append(f"\n{content}\n")
# Display full content for important entries
if role.lower() in ['reasoning-agent-01'] and len(content) > 300:
console.print(f"\n📋 {role} Full Response:", style="bold green")
console.print(Panel(content, border_style="green", title=f"{role} Analysis"))
# Display the tree
console.print(tree)
# Export to markdown if requested
if export_markdown:
# Create deepsearch_results directory
results_dir = Path("deepsearch_results")
results_dir.mkdir(exist_ok=True)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = results_dir / f"research_results_{timestamp}.md"
# Write markdown file
with open(filename, 'w', encoding='utf-8') as f:
f.write('\n'.join(markdown_content))
console.print(f"\n💾 Results exported to: {filename}", style="bold green")
console.print("\n✅ Research analysis complete!", style="bold cyan")
except json.JSONDecodeError as e:
console.print(f"❌ Error parsing JSON: {e}", style="red")
except Exception as e:
console.print(f"❌ Error displaying results: {e}", style="red")
def _get_role_display_info(self, role: str) -> Dict[str, str]:
"""Get display information for different conversation roles."""
role_map = {
"user": {"emoji": "👤", "style": "cyan"},
"deep-research-agent": {"emoji": "🔍", "style": "blue"},
"reasoning-agent-01": {"emoji": "🧠", "style": "magenta"},
"system": {"emoji": "⚙️", "style": "yellow"},
}
role_lower = role.lower()
return role_map.get(role_lower, {"emoji": "🤖", "style": "white"})
# # Example usage
# if __name__ == "__main__":
# swarm = DeepResearchSwarm(
# output_type="json",
# )
# print(
# swarm.step(
# "What is the active tarrif situation with mexico? Only create 2 queries"
# )
# )
# Example usage
if __name__ == "__main__":
try:
swarm = DeepResearchSwarm(
output_type="json",
)
result = swarm.step(
"What is the active tariff situation with mexico? Only create 2 queries"
)
# Parse and display results in rich format with markdown export
swarm.parse_and_display_results(result, export_markdown=True)
except Exception as e:
print(f"Error running deep research swarm: {str(e)}")
import traceback
traceback.print_exc()

Loading…
Cancel
Save