diff --git a/swarms/structs/deep_research_swarm.py b/swarms/structs/deep_research_swarm.py index b5237ea1..530dc2ff 100644 --- a/swarms/structs/deep_research_swarm.py +++ b/swarms/structs/deep_research_swarm.py @@ -1,12 +1,18 @@ import asyncio import concurrent.futures +import json import os from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime +from pathlib import Path from typing import Any, Dict, List, Tuple import aiohttp from dotenv import load_dotenv from rich.console import Console +from rich.panel import Panel +from rich.text import Text +from rich.tree import Tree from swarms.agents.reasoning_duo import ReasoningDuo from swarms.structs.agent import Agent @@ -116,21 +122,37 @@ async def _async_exa_search( ) -> Dict[str, Any]: """Asynchronous helper function for Exa.ai API requests""" api_url = "https://api.exa.ai/search" + + # Check if API key is available + api_key = os.getenv("EXA_API_KEY") + if not api_key: + return {"error": "EXA_API_KEY environment variable not set"} + headers = { - "x-api-key": os.getenv("EXA_API_KEY"), + "x-api-key": api_key, "Content-Type": "application/json", } + # Filter out None keys AND None values from kwargs + safe_kwargs = { + str(k): v for k, v in kwargs.items() + if k is not None and v is not None and str(k) != "None" + } + payload = { "query": query, "useAutoprompt": True, - "numResults": kwargs.get("num_results", 10), + "numResults": safe_kwargs.get("num_results", 10), "contents": { "text": True, "highlights": {"numSentences": 2}, }, - **kwargs, } + + # Only add safe_kwargs if they don't conflict with existing keys + for key, value in safe_kwargs.items(): + if key not in payload and key not in ["query", "useAutoprompt", "numResults", "contents"]: + payload[key] = value try: async with aiohttp.ClientSession() as session: @@ -370,24 +392,20 @@ class DeepResearchSwarm: return [] - def _process_query(self, query: str) -> Tuple[str, str]: + def _process_query(self, query: str) -> str: """ - Process a single query with search and reasoning. + Process a single query with search only. This function is designed to be run in a separate thread. Args: query (str): The query to process Returns: - Tuple[str, str]: A tuple containing (search_results, reasoning_output) + str: Search results """ - # Run the search + # Run the search only - no individual reasoning to avoid duplication results = exa_search(query) - - # Run the reasoning on the search results - reasoning_output = self.reasoning_duo.run(results) - - return (results, reasoning_output) + return results def step(self, query: str): """ @@ -399,54 +417,85 @@ class DeepResearchSwarm: Returns: Formatted conversation history """ - # Get all the queries to process - queries = self.get_queries(query) - - # Submit all queries for concurrent processing - # Using a list instead of generator for clearer debugging - futures = [] - for q in queries: - future = self.executor.submit(self._process_query, q) - futures.append((q, future)) + try: + # Get all the queries to process + queries = self.get_queries(query) + + if not queries: + error_msg = "No queries generated. Please check your input." + self.conversation.add(role="System", content=error_msg) + return history_output_formatter( + self.conversation, type=self.output_type + ) - # Process results as they complete (no waiting for slower queries) - for q, future in futures: + # Submit all queries for concurrent processing + futures = [] + for q in queries: + future = self.executor.submit(self._process_query, q) + futures.append((q, future)) + + # Process results as they complete + for q, future in futures: + try: + # Get search results only + results = future.result() + + # Add search results to conversation + self.conversation.add( + role="User", + content=f"Search results for {q}: \n {results}", + ) + + except Exception as e: + # Handle any errors in the thread + error_msg = f"Error processing query '{q}': {str(e)}" + console.print(f"[bold red]{error_msg}[/bold red]") + self.conversation.add( + role="System", + content=error_msg, + ) + + # Generate final comprehensive analysis after all searches are complete try: - # Get results (blocks until this specific future is done) - results, reasoning_output = future.result() - - # Add search results to conversation - self.conversation.add( - role="User", - content=f"Search results for {q}: \n {results}", + final_summary = self.reasoning_duo.run( + f"Generate an extensive report of the following content: {self.conversation.get_str()}" ) - # Add reasoning output to conversation self.conversation.add( role=self.reasoning_duo.agent_name, - content=reasoning_output, + content=final_summary, ) except Exception as e: - # Handle any errors in the thread + error_msg = f"Error generating final summary: {str(e)}" + console.print(f"[bold red]{error_msg}[/bold red]") self.conversation.add( role="System", - content=f"Error processing query '{q}': {str(e)}", + content=error_msg, ) - # Once all query processing is complete, generate the final summary - # This step runs after all queries to ensure it summarizes all results - final_summary = self.reasoning_duo.run( - f"Generate an extensive report of the following content: {self.conversation.get_str()}" - ) - - self.conversation.add( - role=self.reasoning_duo.agent_name, - content=final_summary, - ) - - return history_output_formatter( - self.conversation, type=self.output_type - ) + # Return formatted output + result = history_output_formatter( + self.conversation, type=self.output_type + ) + + # If output type is JSON, ensure it's properly formatted + if self.output_type.lower() == "json": + try: + import json + if isinstance(result, str): + # Try to parse and reformat for pretty printing + parsed = json.loads(result) + return json.dumps(parsed, indent=2, ensure_ascii=False) + except (json.JSONDecodeError, TypeError): + # If parsing fails, return as-is + pass + + return result + + except Exception as e: + error_msg = f"Critical error in step execution: {str(e)}" + console.print(f"[bold red]{error_msg}[/bold red]") + return {"error": error_msg} if self.output_type.lower() == "json" else error_msg def run(self, task: str): return self.step(task) @@ -466,14 +515,113 @@ class DeepResearchSwarm: future = self.executor.submit(self.step, task) futures.append((task, future)) + def parse_and_display_results(self, json_result: str, export_markdown: bool = True): + """ + Parse JSON results and display in rich format with optional markdown export. + + Args: + json_result (str): JSON string containing conversation results + export_markdown (bool): Whether to export to markdown file + """ + try: + # Parse JSON + data = json.loads(json_result) + + # Create rich display + console.print("\n" + "="*100, style="cyan") + console.print("šŸ”¬ DEEP RESEARCH RESULTS", style="bold cyan", justify="center") + console.print("="*100, style="cyan") + + # Create conversation tree + tree = Tree("šŸ—£ļø Research Conversation", style="bold blue") + markdown_content = ["# Deep Research Results\n", f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"] + + for i, entry in enumerate(data, 1): + if isinstance(entry, dict): + role = entry.get('role', 'Unknown') + content = entry.get('content', '') + timestamp = entry.get('timestamp', '') + + # Get role info for display + role_info = self._get_role_display_info(role) + + # Create tree branch + branch_text = f"{role_info['emoji']} {role}" + if timestamp: + time_part = timestamp.split()[-1] if ' ' in timestamp else timestamp[-8:] + branch_text += f" ({time_part})" + + branch = tree.add(branch_text, style=role_info['style']) + + # Add content preview to tree + content_preview = content[:150] + "..." if len(content) > 150 else content + content_preview = content_preview.replace('\n', ' ') + branch.add(content_preview, style="dim") + + # Add to markdown + markdown_content.append(f"\n## {i}. {role}") + if timestamp: + markdown_content.append(f"**Timestamp:** {timestamp}") + markdown_content.append(f"\n{content}\n") + + # Display full content for important entries + if role.lower() in ['reasoning-agent-01'] and len(content) > 300: + console.print(f"\nšŸ“‹ {role} Full Response:", style="bold green") + console.print(Panel(content, border_style="green", title=f"{role} Analysis")) + + # Display the tree + console.print(tree) + + # Export to markdown if requested + if export_markdown: + # Create deepsearch_results directory + results_dir = Path("deepsearch_results") + results_dir.mkdir(exist_ok=True) + + # Generate filename with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = results_dir / f"research_results_{timestamp}.md" + + # Write markdown file + with open(filename, 'w', encoding='utf-8') as f: + f.write('\n'.join(markdown_content)) + + console.print(f"\nšŸ’¾ Results exported to: {filename}", style="bold green") + + console.print("\nāœ… Research analysis complete!", style="bold cyan") + + except json.JSONDecodeError as e: + console.print(f"āŒ Error parsing JSON: {e}", style="red") + except Exception as e: + console.print(f"āŒ Error displaying results: {e}", style="red") + + def _get_role_display_info(self, role: str) -> Dict[str, str]: + """Get display information for different conversation roles.""" + role_map = { + "user": {"emoji": "šŸ‘¤", "style": "cyan"}, + "deep-research-agent": {"emoji": "šŸ”", "style": "blue"}, + "reasoning-agent-01": {"emoji": "🧠", "style": "magenta"}, + "system": {"emoji": "āš™ļø", "style": "yellow"}, + } + + role_lower = role.lower() + return role_map.get(role_lower, {"emoji": "šŸ¤–", "style": "white"}) + -# # Example usage +# Example usage # if __name__ == "__main__": -# swarm = DeepResearchSwarm( -# output_type="json", -# ) -# print( -# swarm.step( -# "What is the active tarrif situation with mexico? Only create 2 queries" +# try: +# swarm = DeepResearchSwarm( +# output_type="json", +# ) +# result = swarm.step( +# "What is the active tariff situation with mexico? Only create 2 queries" # ) -# ) + +# # Parse and display results in rich format with markdown export +# swarm.parse_and_display_results(result, export_markdown=True) + +# except Exception as e: +# print(f"Error running deep research swarm: {str(e)}") +# import traceback +# traceback.print_exc()