From e3ef675fc16adcf622fb18dcaebf7f3c460d710a Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Mon, 7 Jul 2025 09:34:41 -0700 Subject: [PATCH] [FEAT][ConcurrentWorkflow Dashboard] [ENHANCE][InteractiveGroupchat] [New Concurrentworkflow docs] --- .github/workflows/docker-publish.yml | 73 ++++ .gitignore | 1 + concurrent_example_dashboard.py | 65 ++++ docs/swarms/examples/concurrent_workflow.md | 125 +++---- example.py | 4 + .../real_estate}/README_realtor.md | 0 .../applications/real_estate/realtor_agent.py | 0 examples/misc/agent_map_test.py | 55 +++ .../sequential_workflow_example.py | 0 ...market_analysis_swarm_router_concurrent.py | 3 + .../{ => utils}/meme_agents/bob_the_agent.py | 0 .../meme_agents/meme_agent_generator.py | 0 .../vision}/multimodal_example.py | 0 swarms/structs/agent.py | 195 +++++++++++ swarms/structs/concurrent_workflow.py | 175 +++++++++- swarms/structs/interactive_groupchat.py | 329 +++++++++--------- swarms/structs/ma_utils.py | 67 +++- swarms/utils/formatter.py | 252 +++++++++++--- tests/structs/test_concurrent_workflow.py | 187 +++++++--- 19 files changed, 1189 insertions(+), 342 deletions(-) create mode 100644 .github/workflows/docker-publish.yml create mode 100644 concurrent_example_dashboard.py rename examples/{ => applications/real_estate}/README_realtor.md (100%) rename realtor_agent.py => examples/applications/real_estate/realtor_agent.py (100%) create mode 100644 examples/misc/agent_map_test.py rename sequential_workflow_example.py => examples/multi_agent/sequential_workflow/sequential_workflow_example.py (100%) rename swarm_router.py => examples/multi_agent/swarm_router/market_analysis_swarm_router_concurrent.py (96%) rename examples/multi_agent/{ => utils}/meme_agents/bob_the_agent.py (100%) rename examples/multi_agent/{ => utils}/meme_agents/meme_agent_generator.py (100%) rename examples/{multi_modal => single_agent/vision}/multimodal_example.py (100%) diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml new file mode 100644 index 00000000..17960ff7 --- /dev/null +++ b/.github/workflows/docker-publish.yml @@ -0,0 +1,73 @@ +name: Docker Build and Publish + +on: + push: + branches: [ "master" ] + # Publish semver tags as releases + tags: [ 'v*.*.*' ] + pull_request: + branches: [ "master" ] + +env: + # Use docker.io for Docker Hub if empty + REGISTRY: docker.io + # github.repository as / + IMAGE_NAME: ${{ github.repository }} + +jobs: + build: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + # This is used to complete the identity challenge + # with sigstore/fulcio when running outside of PRs. + id-token: write + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + # Setup QEMU for multi-platform builds + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + # Setup Docker BuildX + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + # Login to Docker Hub + - name: Log into registry ${{ env.REGISTRY }} + if: github.event_name != 'pull_request' + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + # Extract metadata (tags, labels) for Docker + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=ref,event=branch + type=ref,event=pr + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + type=semver,pattern={{major}} + type=sha + + # Build and push Docker image + - name: Build and push Docker image + id: build-and-push + uses: docker/build-push-action@v5 + with: + context: . + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + platforms: linux/amd64,linux/arm64 + cache-from: type=gha + cache-to: type=gha,mode=max \ No newline at end of file diff --git a/.gitignore b/.gitignore index 6df4413d..4a70b60b 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,7 @@ next_swarms_update.txt runs Financial-Analysis-Agent_state.json conversations/ +evolved_gpt2_models/ experimental ffn_alternatives artifacts_five diff --git a/concurrent_example_dashboard.py b/concurrent_example_dashboard.py new file mode 100644 index 00000000..1c1a8980 --- /dev/null +++ b/concurrent_example_dashboard.py @@ -0,0 +1,65 @@ +from swarms import Agent +from swarms.structs.concurrent_workflow import ConcurrentWorkflow + +# Initialize market research agent +market_researcher = Agent( + agent_name="Market-Researcher", + system_prompt="""You are a market research specialist. Your tasks include: + 1. Analyzing market trends and patterns + 2. Identifying market opportunities and threats + 3. Evaluating competitor strategies + 4. Assessing customer needs and preferences + 5. Providing actionable market insights""", + model_name="claude-3-sonnet-20240229", + max_loops=1, + temperature=0.7, + # streaming_on=True, +) + +# Initialize financial analyst agent +financial_analyst = Agent( + agent_name="Financial-Analyst", + system_prompt="""You are a financial analysis expert. Your responsibilities include: + 1. Analyzing financial statements + 2. Evaluating investment opportunities + 3. Assessing risk factors + 4. Providing financial forecasts + 5. Recommending financial strategies""", + model_name="claude-3-sonnet-20240229", + max_loops=1, + # streaming_on=True, + temperature=0.7, +) + +# Initialize technical analyst agent +technical_analyst = Agent( + agent_name="Technical-Analyst", + system_prompt="""You are a technical analysis specialist. Your focus areas include: + 1. Analyzing price patterns and trends + 2. Evaluating technical indicators + 3. Identifying support and resistance levels + 4. Assessing market momentum + 5. Providing trading recommendations""", + model_name="claude-3-sonnet-20240229", + max_loops=1, + temperature=0.7, + # streaming_on=True, +) + +# Create list of agents +agents = [market_researcher, financial_analyst, technical_analyst] + + +router = ConcurrentWorkflow( + name="market-analysis-router", + agents=agents, + max_loops=1, + # output_type="all", + show_dashboard=True, +) + +result = router.run( + "Analyze Tesla (TSLA) stock from market, financial, and technical perspectives" +) + +print(result) diff --git a/docs/swarms/examples/concurrent_workflow.md b/docs/swarms/examples/concurrent_workflow.md index aac1e9e6..da5b4763 100644 --- a/docs/swarms/examples/concurrent_workflow.md +++ b/docs/swarms/examples/concurrent_workflow.md @@ -28,7 +28,8 @@ GROQ_API_KEY="" ### 1. Initialize Specialized Agents ```python -from swarms import Agent, ConcurrentWorkflow +from swarms import Agent +from swarms.structs.concurrent_workflow import ConcurrentWorkflow # Initialize market research agent market_researcher = Agent( @@ -39,8 +40,9 @@ market_researcher = Agent( 3. Evaluating competitor strategies 4. Assessing customer needs and preferences 5. Providing actionable market insights""", - model_name="gpt-4o", + model_name="claude-3-sonnet-20240229", max_loops=1, + temperature=0.7, ) # Initialize financial analyst agent @@ -52,8 +54,9 @@ financial_analyst = Agent( 3. Assessing risk factors 4. Providing financial forecasts 5. Recommending financial strategies""", - model_name="gpt-4o", + model_name="claude-3-sonnet-20240229", max_loops=1, + temperature=0.7, ) # Initialize technical analyst agent @@ -65,91 +68,45 @@ technical_analyst = Agent( 3. Identifying support and resistance levels 4. Assessing market momentum 5. Providing trading recommendations""", - model_name="gpt-4o", + model_name="claude-3-sonnet-20240229", max_loops=1, + temperature=0.7, ) -``` -### 2. Create and Run ConcurrentWorkflow - -```python # Create list of agents agents = [market_researcher, financial_analyst, technical_analyst] -# Initialize the concurrent workflow -workflow = ConcurrentWorkflow( - name="market-analysis-workflow", +# Initialize the concurrent workflow with dashboard +router = ConcurrentWorkflow( + name="market-analysis-router", agents=agents, max_loops=1, + show_dashboard=True, # Enable the real-time dashboard ) # Run the workflow -result = workflow.run( +result = router.run( "Analyze Tesla (TSLA) stock from market, financial, and technical perspectives" ) ``` -## Advanced Usage - -### 1. Custom Agent Configuration +## Features -```python -from swarms import Agent, ConcurrentWorkflow - -# Initialize agents with custom configurations -sentiment_analyzer = Agent( - agent_name="Sentiment-Analyzer", - system_prompt="You analyze social media sentiment...", - model_name="gpt-4o", - max_loops=1, - temperature=0.7, - streaming_on=True, - verbose=True, -) - -news_analyzer = Agent( - agent_name="News-Analyzer", - system_prompt="You analyze news articles and reports...", - model_name="gpt-4o", - max_loops=1, - temperature=0.5, - streaming_on=True, - verbose=True, -) +### Real-time Dashboard -# Create and run workflow -workflow = ConcurrentWorkflow( - name="sentiment-analysis-workflow", - agents=[sentiment_analyzer, news_analyzer], - max_loops=1, - verbose=True, -) +The ConcurrentWorkflow now includes a real-time dashboard feature that can be enabled by setting `show_dashboard=True`. This provides: -result = workflow.run( - "Analyze the market sentiment for Bitcoin based on social media and news" -) -``` +- Live status of each agent's execution +- Progress tracking +- Real-time output visualization +- Task completion metrics -### 2. Error Handling and Logging +### Concurrent Execution -```python -try: - workflow = ConcurrentWorkflow( - name="error-handled-workflow", - agents=agents, - max_loops=1, - verbose=True, - ) - - result = workflow.run("Complex analysis task") - - # Process results - for agent_result in result: - print(f"Agent {agent_result['agent']}: {agent_result['output']}") - -except Exception as e: - print(f"Error in workflow: {str(e)}") -``` +- Multiple agents work simultaneously +- Efficient resource utilization +- Automatic task distribution +- Built-in thread management ## Best Practices @@ -164,7 +121,7 @@ except Exception as e: - Set meaningful system prompts 3. Resource Management: - - Monitor concurrent execution + - Monitor concurrent execution through the dashboard - Handle rate limits appropriately - Manage memory usage @@ -178,8 +135,8 @@ except Exception as e: Here's a complete example showing how to use ConcurrentWorkflow for a comprehensive market analysis: ```python -import os -from swarms import Agent, ConcurrentWorkflow +from swarms import Agent +from swarms.structs.concurrent_workflow import ConcurrentWorkflow # Initialize specialized agents market_analyst = Agent( @@ -190,8 +147,9 @@ market_analyst = Agent( 3. Market opportunities 4. Industry dynamics 5. Growth potential""", - model_name="gpt-4o", + model_name="claude-3-sonnet-20240229", max_loops=1, + temperature=0.7, ) financial_analyst = Agent( @@ -202,8 +160,9 @@ financial_analyst = Agent( 3. Cash flow analysis 4. Valuation metrics 5. Risk assessment""", - model_name="gpt-4o", + model_name="claude-3-sonnet-20240229", max_loops=1, + temperature=0.7, ) risk_analyst = Agent( @@ -214,19 +173,19 @@ risk_analyst = Agent( 3. Financial risks 4. Regulatory risks 5. Strategic risks""", - model_name="gpt-4o", + model_name="claude-3-sonnet-20240229", max_loops=1, + temperature=0.7, ) -# Create the concurrent workflow +# Create the concurrent workflow with dashboard workflow = ConcurrentWorkflow( name="comprehensive-analysis-workflow", agents=[market_analyst, financial_analyst, risk_analyst], max_loops=1, - verbose=True, + show_dashboard=True, # Enable real-time monitoring ) -# Run the analysis try: result = workflow.run( """Provide a comprehensive analysis of Apple Inc. (AAPL) including: @@ -236,13 +195,15 @@ try: ) # Process and display results - for agent_result in result: - print(f"\nAnalysis from {agent_result['agent']}:") - print(agent_result['output']) - print("-" * 50) + print("\nAnalysis Results:") + print("=" * 50) + for agent_output in result: + print(f"\nAnalysis from {agent_output['agent']}:") + print("-" * 40) + print(agent_output['output']) except Exception as e: print(f"Error during analysis: {str(e)}") ``` -This comprehensive guide demonstrates how to effectively use the ConcurrentWorkflow architecture for parallel processing of complex tasks using multiple specialized agents. \ No newline at end of file +This guide demonstrates how to effectively use the ConcurrentWorkflow architecture with its new dashboard feature for parallel processing of complex tasks using multiple specialized agents. \ No newline at end of file diff --git a/example.py b/example.py index cd0d78be..8321ae86 100644 --- a/example.py +++ b/example.py @@ -1,4 +1,5 @@ import time + from swarms import Agent # Initialize the agent @@ -38,6 +39,9 @@ agent = Agent( model_name="claude-3-sonnet-20240229", dynamic_temperature_enabled=True, output_type="all", + speed_mode="fast", + streaming_on=True, + print_on=True, # dashboard=True ) diff --git a/examples/README_realtor.md b/examples/applications/real_estate/README_realtor.md similarity index 100% rename from examples/README_realtor.md rename to examples/applications/real_estate/README_realtor.md diff --git a/realtor_agent.py b/examples/applications/real_estate/realtor_agent.py similarity index 100% rename from realtor_agent.py rename to examples/applications/real_estate/realtor_agent.py diff --git a/examples/misc/agent_map_test.py b/examples/misc/agent_map_test.py new file mode 100644 index 00000000..caf6eda4 --- /dev/null +++ b/examples/misc/agent_map_test.py @@ -0,0 +1,55 @@ +from swarms import Agent +from swarms.structs.ma_utils import create_agent_map + +# Initialize market research agent +market_researcher = Agent( + agent_name="Market-Researcher", + system_prompt="""You are a market research specialist. Your tasks include: + 1. Analyzing market trends and patterns + 2. Identifying market opportunities and threats + 3. Evaluating competitor strategies + 4. Assessing customer needs and preferences + 5. Providing actionable market insights""", + model_name="claude-3-sonnet-20240229", + max_loops=1, + temperature=0.7, + # streaming_on=True, +) + +# Initialize financial analyst agent +financial_analyst = Agent( + agent_name="Financial-Analyst", + system_prompt="""You are a financial analysis expert. Your responsibilities include: + 1. Analyzing financial statements + 2. Evaluating investment opportunities + 3. Assessing risk factors + 4. Providing financial forecasts + 5. Recommending financial strategies""", + model_name="claude-3-sonnet-20240229", + max_loops=1, + # streaming_on=True, + temperature=0.7, +) + +# Initialize technical analyst agent +technical_analyst = Agent( + agent_name="Technical-Analyst", + system_prompt="""You are a technical analysis specialist. Your focus areas include: + 1. Analyzing price patterns and trends + 2. Evaluating technical indicators + 3. Identifying support and resistance levels + 4. Assessing market momentum + 5. Providing trading recommendations""", + model_name="claude-3-sonnet-20240229", + max_loops=1, + temperature=0.7, + # streaming_on=True, +) + +# Create list of agents +agents = [market_researcher, financial_analyst, technical_analyst] + +out = create_agent_map(agents) +print(out) + +print(out.keys()) diff --git a/sequential_workflow_example.py b/examples/multi_agent/sequential_workflow/sequential_workflow_example.py similarity index 100% rename from sequential_workflow_example.py rename to examples/multi_agent/sequential_workflow/sequential_workflow_example.py diff --git a/swarm_router.py b/examples/multi_agent/swarm_router/market_analysis_swarm_router_concurrent.py similarity index 96% rename from swarm_router.py rename to examples/multi_agent/swarm_router/market_analysis_swarm_router_concurrent.py index efe3bcb5..f0b324a8 100644 --- a/swarm_router.py +++ b/examples/multi_agent/swarm_router/market_analysis_swarm_router_concurrent.py @@ -11,6 +11,7 @@ market_researcher = Agent( 5. Providing actionable market insights""", model_name="claude-sonnet-4-20250514", max_loops=1, + streaming_on=True, ) # Initialize financial analyst agent @@ -24,6 +25,7 @@ financial_analyst = Agent( 5. Recommending financial strategies""", model_name="claude-sonnet-4-20250514", max_loops=1, + streaming_on=True, ) # Initialize technical analyst agent @@ -37,6 +39,7 @@ technical_analyst = Agent( 5. Providing trading recommendations""", model_name="claude-sonnet-4-20250514", max_loops=1, + streaming_on=True, ) # Create list of agents diff --git a/examples/multi_agent/meme_agents/bob_the_agent.py b/examples/multi_agent/utils/meme_agents/bob_the_agent.py similarity index 100% rename from examples/multi_agent/meme_agents/bob_the_agent.py rename to examples/multi_agent/utils/meme_agents/bob_the_agent.py diff --git a/examples/multi_agent/meme_agents/meme_agent_generator.py b/examples/multi_agent/utils/meme_agents/meme_agent_generator.py similarity index 100% rename from examples/multi_agent/meme_agents/meme_agent_generator.py rename to examples/multi_agent/utils/meme_agents/meme_agent_generator.py diff --git a/examples/multi_modal/multimodal_example.py b/examples/single_agent/vision/multimodal_example.py similarity index 100% rename from examples/multi_modal/multimodal_example.py rename to examples/single_agent/vision/multimodal_example.py diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 1b30644c..b3eddf6f 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -433,6 +433,7 @@ class Agent: output_raw_json_from_tool_call: bool = False, summarize_multiple_images: bool = False, tool_retry_attempts: int = 3, + speed_mode: str = "fast", *args, **kwargs, ): @@ -573,6 +574,7 @@ class Agent: ) self.summarize_multiple_images = summarize_multiple_images self.tool_retry_attempts = tool_retry_attempts + self.speed_mode = speed_mode # self.short_memory = self.short_memory_init() @@ -1231,6 +1233,192 @@ class Agent: except KeyboardInterrupt as error: self._handle_run_error(error) + def _run_fast( + self, + task: Optional[Union[str, Any]] = None, + img: Optional[str] = None, + print_task: Optional[bool] = False, + *args, + **kwargs, + ) -> Any: + """ + run the agent + + Args: + task (str): The task to be performed. + img (str): The image to be processed. + is_last (bool): Indicates if this is the last task. + + Returns: + Any: The output of the agent. + (string, list, json, dict, yaml, xml) + + Examples: + agent(task="What is the capital of France?") + agent(task="What is the capital of France?", img="path/to/image.jpg") + agent(task="What is the capital of France?", img="path/to/image.jpg", is_last=True) + """ + try: + + self.short_memory.add(role=self.user_name, content=task) + + # Set the loop count + loop_count = 0 + + # Clear the short memory + response = None + + # Query the long term memory first for the context + if self.long_term_memory is not None: + self.memory_query(task) + + # Print the request + if print_task is True: + formatter.print_panel( + content=f"\n User: {task}", + title=f"Task Request for {self.agent_name}", + ) + + while ( + self.max_loops == "auto" + or loop_count < self.max_loops + ): + loop_count += 1 + + if self.max_loops >= 2: + self.short_memory.add( + role=self.agent_name, + content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}", + ) + + # If it is the final loop, then add the final loop message + if loop_count >= 2 and loop_count == self.max_loops: + self.short_memory.add( + role=self.agent_name, + content=f"🎉 Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.", + ) + + # Dynamic temperature + if self.dynamic_temperature_enabled is True: + self.dynamic_temperature() + + # Task prompt + task_prompt = ( + self.short_memory.return_history_as_string() + ) + + # Parameters + attempt = 0 + success = False + while attempt < self.retry_attempts and not success: + try: + if ( + self.long_term_memory is not None + and self.rag_every_loop is True + ): + logger.info( + "Querying RAG database for context..." + ) + self.memory_query(task_prompt) + + if img is not None: + response = self.call_llm( + task=task_prompt, + img=img, + current_loop=loop_count, + *args, + **kwargs, + ) + else: + response = self.call_llm( + task=task_prompt, + current_loop=loop_count, + *args, + **kwargs, + ) + + # If streaming is enabled, then don't print the response + + # Parse the response from the agent with the output type + if exists(self.tools_list_dictionary): + if isinstance(response, BaseModel): + response = response.model_dump() + + # Parse the response from the agent with the output type + response = self.parse_llm_output(response) + + self.short_memory.add( + role=self.agent_name, + content=response, + ) + + # Print + if self.print_on is True: + if isinstance(response, list): + self.pretty_print( + f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n {format_data_structure(response)} ", + loop_count, + ) + elif self.streaming_on is True: + pass + else: + self.pretty_print( + response, loop_count + ) + + # Check and execute callable tools + if exists(self.tools): + self.tool_execution_retry( + response, loop_count + ) + + # Handle MCP tools + if ( + exists(self.mcp_url) + or exists(self.mcp_config) + or exists(self.mcp_urls) + ): + # Only handle MCP tools if response is not None + if response is not None: + self.mcp_tool_handling( + response=response, + current_loop=loop_count, + ) + else: + logger.warning( + f"LLM returned None response in loop {loop_count}, skipping MCP tool handling" + ) + + success = True # Mark as successful to exit the retry loop + + except Exception as e: + + logger.error( + f"Attempt {attempt+1}/{self.retry_attempts}: Error generating response in loop {loop_count} for agent '{self.agent_name}': {str(e)} | " + ) + attempt += 1 + + if not success: + + logger.error( + "Failed to generate a valid response after" + " retry attempts." + ) + break # Exit the loop if all retry attempts fail + + # log_agent_data(self.to_dict()) + + # Output formatting based on output_type + return history_output_formatter( + self.short_memory, type=self.output_type + ) + + except Exception as error: + self._handle_run_error(error) + + except KeyboardInterrupt as error: + self._handle_run_error(error) + def __handle_run_error(self, error: any): import traceback @@ -2676,6 +2864,13 @@ class Agent: *args, **kwargs, ) + elif self.speed_mode == "fast": + output = self._run_fast( + task=task, + img=img, + *args, + **kwargs, + ) else: output = self._run( task=task, diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index a3abe1eb..b786e3ec 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -9,6 +9,7 @@ from swarms.utils.history_output_formatter import ( history_output_formatter, ) from swarms.utils.loguru_logger import initialize_logger +from swarms.utils.formatter import formatter logger = initialize_logger(log_folder="concurrent_workflow") @@ -32,7 +33,7 @@ class ConcurrentWorkflow(BaseSwarm): return_str_on (bool): Flag indicating whether to return the output as a string. Defaults to False. auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. Defaults to False. return_entire_history (bool): Flag indicating whether to return the entire conversation history. Defaults to False. - + show_dashboard (bool): Flag indicating whether to show a real-time dashboard. Defaults to True. Raises: ValueError: If the list of agents is empty or if the description is empty. @@ -46,6 +47,8 @@ class ConcurrentWorkflow(BaseSwarm): output_type (str): The type of output format. max_loops (int): The maximum number of loops for each agent. auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. + show_dashboard (bool): Flag indicating whether to show a real-time dashboard. + agent_statuses (dict): Dictionary to track agent statuses. """ def __init__( @@ -58,6 +61,7 @@ class ConcurrentWorkflow(BaseSwarm): output_type: str = "dict-all-except-first", max_loops: int = 1, auto_generate_prompts: bool = False, + show_dashboard: bool = True, *args, **kwargs, ): @@ -76,10 +80,24 @@ class ConcurrentWorkflow(BaseSwarm): self.max_loops = max_loops self.auto_generate_prompts = auto_generate_prompts self.output_type = output_type + self.show_dashboard = show_dashboard + self.agent_statuses = { + agent.agent_name: {"status": "pending", "output": ""} + for agent in agents + } self.reliability_check() self.conversation = Conversation() + if self.show_dashboard is True: + self.agents = self.fix_agents() + + def fix_agents(self): + if self.show_dashboard is True: + for agent in self.agents: + agent.print_on = False + return self.agents + def reliability_check(self): try: if self.agents is None: @@ -115,7 +133,146 @@ class ConcurrentWorkflow(BaseSwarm): for agent in self.agents: agent.auto_generate_prompt = True - def run( + def display_agent_dashboard( + self, + title: str = "🤖 Agent Dashboard", + is_final: bool = False, + ) -> None: + """ + Displays the current status of all agents in a beautiful dashboard format. + + Args: + title (str): The title of the dashboard. + is_final (bool): Flag indicating whether this is the final dashboard. + """ + agents_data = [ + { + "name": agent.agent_name, + "status": self.agent_statuses[agent.agent_name][ + "status" + ], + "output": self.agent_statuses[agent.agent_name][ + "output" + ], + } + for agent in self.agents + ] + formatter.print_agent_dashboard(agents_data, title, is_final) + + def run_with_dashboard( + self, + task: str, + img: Optional[str] = None, + imgs: Optional[List[str]] = None, + ): + """ + Executes all agents in the workflow concurrently on the given task. + Now includes real-time dashboard updates. + """ + try: + self.conversation.add(role="User", content=task) + + # Reset agent statuses + for agent in self.agents: + self.agent_statuses[agent.agent_name] = { + "status": "pending", + "output": "", + } + + # Display initial dashboard if enabled + if self.show_dashboard: + self.display_agent_dashboard() + + # Use 95% of available CPU cores for optimal performance + max_workers = int(os.cpu_count() * 0.95) + + # Create a list to store all futures and their results + futures = [] + results = [] + + def run_agent_with_status(agent, task, img, imgs): + try: + # Update status to running + self.agent_statuses[agent.agent_name][ + "status" + ] = "running" + if self.show_dashboard: + self.display_agent_dashboard() + + # Run the agent + output = agent.run(task=task, img=img, imgs=imgs) + + # Update status to completed + self.agent_statuses[agent.agent_name][ + "status" + ] = "completed" + self.agent_statuses[agent.agent_name][ + "output" + ] = output + if self.show_dashboard: + self.display_agent_dashboard() + + return output + except Exception as e: + # Update status to error + self.agent_statuses[agent.agent_name][ + "status" + ] = "error" + self.agent_statuses[agent.agent_name][ + "output" + ] = f"Error: {str(e)}" + if self.show_dashboard: + self.display_agent_dashboard() + raise + + # Run agents concurrently using ThreadPoolExecutor + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers + ) as executor: + # Submit all agent tasks + futures = [ + executor.submit( + run_agent_with_status, agent, task, img, imgs + ) + for agent in self.agents + ] + + # Wait for all futures to complete + concurrent.futures.wait(futures) + + # Process results in order of completion + for future, agent in zip(futures, self.agents): + try: + output = future.result() + results.append((agent.agent_name, output)) + except Exception as e: + logger.error( + f"Agent {agent.agent_name} failed: {str(e)}" + ) + results.append( + (agent.agent_name, f"Error: {str(e)}") + ) + + # Add all results to conversation + for agent_name, output in results: + self.conversation.add(role=agent_name, content=output) + + # Display final dashboard if enabled + if self.show_dashboard: + self.display_agent_dashboard( + "🎉 Final Agent Dashboard", is_final=True + ) + + return history_output_formatter( + conversation=self.conversation, + type=self.output_type, + ) + finally: + # Always clean up the dashboard display + if self.show_dashboard: + formatter.stop_dashboard() + + def _run( self, task: str, img: Optional[str] = None, @@ -167,6 +324,20 @@ class ConcurrentWorkflow(BaseSwarm): type=self.output_type, ) + def run( + self, + task: str, + img: Optional[str] = None, + imgs: Optional[List[str]] = None, + ): + """ + Executes all agents in the workflow concurrently on the given task. + """ + if self.show_dashboard: + return self.run_with_dashboard(task, img, imgs) + else: + return self._run(task, img, imgs) + def batch_run( self, tasks: List[str], diff --git a/swarms/structs/interactive_groupchat.py b/swarms/structs/interactive_groupchat.py index 9c0e8afe..3174e55c 100644 --- a/swarms/structs/interactive_groupchat.py +++ b/swarms/structs/interactive_groupchat.py @@ -6,6 +6,7 @@ from loguru import logger from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation +from swarms.structs.ma_utils import create_agent_map from swarms.utils.generate_keys import generate_api_key from swarms.utils.history_output_formatter import ( history_output_formatter, @@ -24,12 +25,6 @@ class AgentNotFoundError(InteractiveGroupChatError): pass -class NoMentionedAgentsError(InteractiveGroupChatError): - """Raised when no agents are mentioned in the task""" - - pass - - class InvalidTaskFormatError(InteractiveGroupChatError): """Raised when the task format is invalid""" @@ -294,14 +289,7 @@ class InteractiveGroupChat: # Initialize conversation history self.conversation = Conversation(time_enabled=True) - # Create a mapping of agent names to agents for easy lookup - self.agent_map = {} - for agent in agents: - if isinstance(agent, Agent): - self.agent_map[agent.agent_name] = agent - elif callable(agent): - # For callable functions, use the function name as the agent name - self.agent_map[agent.__name__] = agent + self.agent_map = create_agent_map(self.agents) self._validate_initialization() self._setup_conversation_context() @@ -398,7 +386,7 @@ class InteractiveGroupChat: Start an interactive terminal session for chatting with agents. This method creates a REPL (Read-Eval-Print Loop) that allows users to: - - Chat with agents using @mentions + - Chat with agents using @mentions (optional) - See available agents and their descriptions - Exit the session using 'exit' or 'quit' - Get help using 'help' or '?' @@ -426,7 +414,9 @@ class InteractiveGroupChat: print("- Type 'help' or '?' for help") print("- Type 'exit' or 'quit' to end the session") print("- Type 'speaker' to change speaker function") - print("- Use @agent_name to mention agents") + print( + "- Use @agent_name to mention specific agents (optional)" + ) print("\nStart chatting:") while True: @@ -441,9 +431,11 @@ class InteractiveGroupChat: if user_input.lower() in ["help", "?"]: print("\nHelp:") - print("1. Mention agents using @agent_name") print( - "2. You can mention multiple agents in one task" + "1. You can mention specific agents using @agent_name (optional)" + ) + print( + "2. If no agents are mentioned, they will be selected automatically" ) print("3. Available agents:") for name in self.agent_map: @@ -513,10 +505,6 @@ class InteractiveGroupChat: print("\nChat:") # print(response) - except NoMentionedAgentsError: - print( - "\nError: Please mention at least one agent using @agent_name" - ) except AgentNotFoundError as e: print(f"\nError: {str(e)}") except Exception as e: @@ -699,13 +687,13 @@ Remember: You are part of a team. Your response should reflect that you've read, def _extract_mentions(self, task: str) -> List[str]: """ - Extracts @mentions from the task. + Extracts @mentions from the task. If no mentions are found, returns all available agents. Args: task (str): The input task Returns: - List[str]: List of mentioned agent names + List[str]: List of mentioned agent names or all agent names if no mentions Raises: InvalidtaskFormatError: If the task format is invalid @@ -713,11 +701,17 @@ Remember: You are part of a team. Your response should reflect that you've read, try: # Find all @mentions using regex mentions = re.findall(r"@(\w+)", task) - return [ + valid_mentions = [ mention for mention in mentions if mention in self.agent_map ] + + # If no valid mentions found, return all available agents + if not valid_mentions: + return list(self.agent_map.keys()) + + return valid_mentions except Exception as e: logger.error(f"Error extracting mentions: {e}") raise InvalidTaskFormatError(f"Invalid task format: {e}") @@ -810,6 +804,149 @@ Remember: You are part of a team. Your response should reflect that you've read, # Fallback to original order return mentioned_agents + def _process_dynamic_speakers( + self, + mentioned_agents: List[str], + img: Optional[str], + imgs: Optional[List[str]], + ) -> None: + """ + Process responses using the dynamic speaker function. + """ + # Get strategy from speaker state (default to sequential) + strategy = self.speaker_state.get("strategy", "sequential") + + # Track which agents have spoken to ensure all get a chance + spoken_agents = set() + last_response = "" + max_iterations = ( + len(mentioned_agents) * 3 + ) # Allow more iterations for parallel + iteration = 0 + + while iteration < max_iterations and len(spoken_agents) < len( + mentioned_agents + ): + # Determine next speaker(s) using dynamic function + next_speakers = self.speaker_function( + mentioned_agents, + last_response, + strategy=strategy, + **self.speaker_state, + ) + + # Handle both single agent and multiple agents + if isinstance(next_speakers, str): + next_speakers = [next_speakers] + + # Filter out invalid agents + valid_next_speakers = [ + agent + for agent in next_speakers + if agent in mentioned_agents + ] + + if not valid_next_speakers: + # If no valid mentions found, randomly select from unspoken agents + unspoken_agents = [ + agent + for agent in mentioned_agents + if agent not in spoken_agents + ] + if unspoken_agents: + valid_next_speakers = [ + random.choice(unspoken_agents) + ] + else: + # All agents have spoken, break the loop + break + + # Process agents based on strategy + if strategy == "sequential": + self._process_sequential_speakers( + valid_next_speakers, spoken_agents, img, imgs + ) + elif strategy == "parallel": + self._process_parallel_speakers( + valid_next_speakers, spoken_agents, img, imgs + ) + + iteration += 1 + + def _process_sequential_speakers( + self, + speakers: List[str], + spoken_agents: set, + img: Optional[str], + imgs: Optional[List[str]], + ) -> None: + """ + Process speakers sequentially. + """ + for next_speaker in speakers: + if next_speaker in spoken_agents: + continue # Skip if already spoken + + response = self._get_agent_response( + next_speaker, img, imgs + ) + if response: + spoken_agents.add(next_speaker) + break # Only process one agent in sequential mode + + def _process_parallel_speakers( + self, + speakers: List[str], + spoken_agents: set, + img: Optional[str], + imgs: Optional[List[str]], + ) -> None: + """ + Process speakers in parallel. + """ + import concurrent.futures + + # Get responses from all valid agents + responses = [] + with concurrent.futures.ThreadPoolExecutor() as executor: + future_to_agent = { + executor.submit( + self._get_agent_response, agent, img, imgs + ): agent + for agent in speakers + if agent not in spoken_agents + } + + for future in concurrent.futures.as_completed( + future_to_agent + ): + agent = future_to_agent[future] + try: + response = future.result() + if response: + responses.append(response) + spoken_agents.add(agent) + except Exception as e: + logger.error( + f"Error getting response from {agent}: {e}" + ) + + def _process_static_speakers( + self, + mentioned_agents: List[str], + img: Optional[str], + imgs: Optional[List[str]], + ) -> None: + """ + Process responses using a static speaker function. + """ + speaking_order = self._get_speaking_order(mentioned_agents) + logger.info(f"Speaking order determined: {speaking_order}") + + # Get responses from mentioned agents in the determined order + for agent_name in speaking_order: + self._get_agent_response(agent_name, img, imgs) + def run( self, task: str, @@ -817,151 +954,33 @@ Remember: You are part of a team. Your response should reflect that you've read, imgs: Optional[List[str]] = None, ) -> str: """ - Process a task and get responses from mentioned agents. - If interactive mode is enabled, this will be called by start_interactive_session(). - Otherwise, it can be called directly for single task processing. + Process a task and get responses from agents. If no agents are mentioned, + randomly selects agents to participate. """ try: - # Extract mentioned agents - mentioned_agents = self._extract_mentions(task) - - if not mentioned_agents: - raise NoMentionedAgentsError( - "No valid agents mentioned in the task" - ) + # Extract mentioned agents (or all agents if none mentioned) + if "@" in task: + mentioned_agents = self._extract_mentions(task) + else: + pass # Add user task to conversation self.conversation.add(role="User", content=task) - # Handle dynamic speaker function differently + # Process responses based on speaker function type if self.speaker_function == random_dynamic_speaker: - # Get strategy from speaker state (default to sequential) - strategy = self.speaker_state.get( - "strategy", "sequential" + self._process_dynamic_speakers( + mentioned_agents, img, imgs ) - - # For dynamic speaker, we'll determine the next speaker after each response - # Track which agents have spoken to ensure all get a chance - spoken_agents = set() - last_response = "" - max_iterations = ( - len(mentioned_agents) * 3 - ) # Allow more iterations for parallel - iteration = 0 - - while iteration < max_iterations and len( - spoken_agents - ) < len(mentioned_agents): - # Determine next speaker(s) using dynamic function - next_speakers = self.speaker_function( - mentioned_agents, # Use all mentioned agents, not remaining_agents - last_response, - strategy=strategy, - **self.speaker_state, - ) - - # Handle both single agent and multiple agents - if isinstance(next_speakers, str): - next_speakers = [next_speakers] - - # Filter out invalid agents - valid_next_speakers = [ - agent - for agent in next_speakers - if agent in mentioned_agents - ] - - if not valid_next_speakers: - # If no valid mentions found, randomly select from unspoken agents - unspoken_agents = [ - agent - for agent in mentioned_agents - if agent not in spoken_agents - ] - if unspoken_agents: - valid_next_speakers = [ - random.choice(unspoken_agents) - ] - else: - # All agents have spoken, break the loop - break - - # Process agents based on strategy - if strategy == "sequential": - # Process one agent at a time - for next_speaker in valid_next_speakers: - if next_speaker in spoken_agents: - continue # Skip if already spoken - - response = self._get_agent_response( - next_speaker, img, imgs - ) - if response: - last_response = response - spoken_agents.add(next_speaker) - break # Only process one agent in sequential mode - - elif strategy == "parallel": - # Process all mentioned agents in parallel - import concurrent.futures - - # Get responses from all valid agents - responses = [] - with concurrent.futures.ThreadPoolExecutor() as executor: - future_to_agent = { - executor.submit( - self._get_agent_response, - agent, - img, - imgs, - ): agent - for agent in valid_next_speakers - if agent not in spoken_agents - } - - for ( - future - ) in concurrent.futures.as_completed( - future_to_agent - ): - agent = future_to_agent[future] - try: - response = future.result() - if response: - responses.append(response) - spoken_agents.add(agent) - except Exception as e: - logger.error( - f"Error getting response from {agent}: {e}" - ) - - # Combine responses for next iteration - if responses: - last_response = "\n\n".join(responses) - - iteration += 1 else: - # For non-dynamic speaker functions, use the original logic - speaking_order = self._get_speaking_order( - mentioned_agents + self._process_static_speakers( + mentioned_agents, img, imgs ) - logger.info( - f"Speaking order determined: {speaking_order}" - ) - - # Get responses from mentioned agents in the determined order - for agent_name in speaking_order: - response = self._get_agent_response( - agent_name, img, imgs - ) return history_output_formatter( self.conversation, self.output_type ) - except InteractiveGroupChatError as e: - logger.error(f"GroupChat error: {e}") - raise except Exception as e: logger.error(f"Unexpected error: {e}") raise InteractiveGroupChatError( diff --git a/swarms/structs/ma_utils.py b/swarms/structs/ma_utils.py index 1cf1e0fb..ccb45617 100644 --- a/swarms/structs/ma_utils.py +++ b/swarms/structs/ma_utils.py @@ -1,8 +1,11 @@ -from typing import List, Any, Optional, Union, Callable +from typing import Dict, List, Any, Optional, Union, Callable import random from swarms.prompts.collaborative_prompts import ( get_multi_agent_collaboration_prompt_one, ) +from functools import lru_cache + +from loguru import logger def list_all_agents( @@ -116,3 +119,65 @@ def set_random_models_for_agents( else: setattr(agents, "model_name", random.choice(model_names)) return agents + + +@lru_cache(maxsize=128) +def _create_agent_map_cached( + agent_tuple: tuple, +) -> Dict[str, Union[Callable, Any]]: + """Internal cached version of create_agent_map that takes a tuple for hashability.""" + try: + return { + ( + agent.agent_name + if isinstance(agent, Callable) + else agent.__name__ + ): agent + for agent in agent_tuple + } + except (AttributeError, TypeError) as e: + logger.error(f"Error creating agent map: {e}") + return {} + + +def create_agent_map( + agents: List[Union[Callable, Any]], +) -> Dict[str, Union[Callable, Any]]: + """Creates a map of agent names to agents for fast lookup. + + This function is optimized with LRU caching to avoid recreating maps for identical agent lists. + The cache stores up to 128 different agent map configurations. + + Args: + agents (List[Union[Callable, Any]]): List of agents to create a map of. Each agent should either be: + - A callable with a __name__ attribute + - An object with an agent_name attribute + + Returns: + Dict[str, Union[Callable, Any]]: Map of agent names to agents + + Examples: + >>> def agent1(): pass + >>> def agent2(): pass + >>> agents = [agent1, agent2] + >>> agent_map = create_agent_map(agents) + >>> print(agent_map.keys()) + dict_keys(['agent1', 'agent2']) + + >>> class Agent: + ... def __init__(self, name): + ... self.agent_name = name + >>> agents = [Agent("bot1"), Agent("bot2")] + >>> agent_map = create_agent_map(agents) + >>> print(agent_map.keys()) + dict_keys(['bot1', 'bot2']) + + Raises: + ValueError: If agents list is empty + TypeError: If any agent lacks required name attributes + """ + if not agents: + raise ValueError("Agents list cannot be empty") + + # Convert list to tuple for hashability + return _create_agent_map_cached(tuple(agents)) diff --git a/swarms/utils/formatter.py b/swarms/utils/formatter.py index 34aa5eb8..5f5faccb 100644 --- a/swarms/utils/formatter.py +++ b/swarms/utils/formatter.py @@ -5,9 +5,27 @@ from typing import Any, Callable, Dict, List, Optional from rich.console import Console from rich.live import Live from rich.panel import Panel -from rich.progress import Progress, SpinnerColumn, TextColumn +from rich.progress import ( + Progress, + SpinnerColumn, + TextColumn, +) from rich.table import Table from rich.text import Text +from rich.spinner import Spinner + +# Global lock to ensure only a single Rich Live context is active at any moment. +# Rich's Live render is **not** thread-safe; concurrent Live contexts on the same +# console raise runtime errors. Using a module-level lock serialises access and +# prevents crashes when multiple agents stream simultaneously in different +# threads (e.g., in ConcurrentWorkflow). +live_render_lock = threading.Lock() + +# Global Live display for the dashboard +dashboard_live = None + +# Create a spinner for loading animation +spinner = Spinner("dots", style="yellow") def choose_random_color(): @@ -37,6 +55,53 @@ class Formatter: Initializes the Formatter with a Rich Console instance. """ self.console = Console() + self._dashboard_live = None + self._spinner_frames = [ + "⠋", + "⠙", + "⠹", + "⠸", + "⠼", + "⠴", + "⠦", + "⠧", + "⠇", + "⠏", + ] + self._spinner_idx = 0 + + def _get_status_with_loading(self, status: str) -> Text: + """ + Creates a status text with loading animation for running status. + """ + if status.lower() == "running": + # Create loading bar effect + self._spinner_idx = (self._spinner_idx + 1) % len( + self._spinner_frames + ) + spinner_char = self._spinner_frames[self._spinner_idx] + progress_bar = "█" * (self._spinner_idx % 5) + "░" * ( + 4 - (self._spinner_idx % 5) + ) + return Text( + f"{spinner_char} {status} {progress_bar}", + style="bold yellow", + ) + + # Style other statuses + status_style = { + "completed": "bold green", + "pending": "bold red", + "error": "bold red", + }.get(status.lower(), "white") + + status_symbol = { + "completed": "✓", + "pending": "○", + "error": "✗", + }.get(status.lower(), "•") + + return Text(f"{status_symbol} {status}", style=status_style) def _print_panel( self, content: str, title: str = "", style: str = "bold blue" @@ -209,58 +274,155 @@ class Formatter: complete_response = "" chunks_collected = [] - # TRUE streaming with Rich's automatic text wrapping - with Live( - create_streaming_panel(streaming_text), - console=self.console, - refresh_per_second=20, - ) as live: - try: - for part in streaming_response: - if ( - hasattr(part, "choices") - and part.choices - and part.choices[0].delta.content - ): - # Add ONLY the new chunk to the Text object with random color style - chunk = part.choices[0].delta.content - streaming_text.append(chunk, style=text_style) - complete_response += chunk - - # Collect chunks if requested - if collect_chunks: - chunks_collected.append(chunk) - - # Call chunk callback if provided - if on_chunk_callback: - on_chunk_callback(chunk) - - # Update display with new text - Rich handles all wrapping automatically - live.update( - create_streaming_panel( - streaming_text, is_complete=False + # Acquire the lock so that only one Live panel is active at a time. + # Other threads will wait here until the current streaming completes, + # avoiding Rich.Live concurrency errors. + with live_render_lock: + # TRUE streaming with Rich's automatic text wrapping + with Live( + create_streaming_panel(streaming_text), + console=self.console, + refresh_per_second=20, + ) as live: + try: + for part in streaming_response: + if ( + hasattr(part, "choices") + and part.choices + and part.choices[0].delta.content + ): + # Add ONLY the new chunk to the Text object with random color style + chunk = part.choices[0].delta.content + streaming_text.append( + chunk, style=text_style + ) + complete_response += chunk + + # Collect chunks if requested + if collect_chunks: + chunks_collected.append(chunk) + + # Call chunk callback if provided + if on_chunk_callback: + on_chunk_callback(chunk) + + # Update display with new text - Rich handles all wrapping automatically + live.update( + create_streaming_panel( + streaming_text, is_complete=False + ) ) + + # Final update to show completion + live.update( + create_streaming_panel( + streaming_text, is_complete=True ) + ) - # Final update to show completion - live.update( - create_streaming_panel( - streaming_text, is_complete=True + except Exception as e: + # Handle any streaming errors gracefully + streaming_text.append( + f"\n[Error: {str(e)}]", style="bold red" ) - ) + live.update( + create_streaming_panel( + streaming_text, is_complete=True + ) + ) + + return complete_response + + def _create_dashboard_table( + self, agents_data: List[Dict[str, Any]], title: str + ) -> Panel: + """ + Creates the dashboard table with the current agent statuses. + """ + # Create main table + table = Table( + show_header=True, + header_style="bold magenta", + expand=True, + title=title, + title_style="bold cyan", + border_style="bright_blue", + show_lines=True, # Add lines between rows + ) + + # Add columns with adjusted widths + table.add_column( + "Agent Name", style="cyan", width=30, no_wrap=True + ) + table.add_column( + "Status", style="green", width=20, no_wrap=True + ) # Increased width for loading animation + table.add_column( + "Output", style="white", width=100, overflow="fold" + ) # Allow text to wrap + + # Add rows for each agent + for agent in agents_data: + name = Text(agent["name"], style="bold cyan") + status = self._get_status_with_loading(agent["status"]) + output = Text(str(agent["output"])) + table.add_row(name, status, output) + + # Create a panel to wrap the table + dashboard_panel = Panel( + table, + border_style="bright_blue", + padding=(1, 2), + title=f"[bold cyan]{title}[/bold cyan] - Total Agents: [bold green]{len(agents_data)}[/bold green]", + expand=True, # Make panel expand to full width + ) + + return dashboard_panel - except Exception as e: - # Handle any streaming errors gracefully - streaming_text.append( - f"\n[Error: {str(e)}]", style="bold red" + def print_agent_dashboard( + self, + agents_data: List[Dict[str, Any]], + title: str = "🤖 Agent Dashboard", + is_final: bool = False, + ) -> None: + """ + Displays a beautiful dashboard showing agent information in a panel-like spreadsheet format. + Updates in place instead of printing multiple times. + + Args: + agents_data (List[Dict[str, Any]]): List of dictionaries containing agent information. + Each dict should have: name, status, output + title (str): The title of the dashboard. + is_final (bool): Whether this is the final update of the dashboard. + """ + with live_render_lock: + if self._dashboard_live is None: + # Create new Live display if none exists + self._dashboard_live = Live( + self._create_dashboard_table(agents_data, title), + console=self.console, + refresh_per_second=10, # Increased refresh rate + transient=False, # Make display persistent ) - live.update( - create_streaming_panel( - streaming_text, is_complete=True - ) + self._dashboard_live.start() + else: + # Update existing Live display + self._dashboard_live.update( + self._create_dashboard_table(agents_data, title) ) - return complete_response + # If this is the final update, add a newline to separate from future output + if is_final: + self.console.print() # Add blank line after final display + + def stop_dashboard(self): + """ + Stops and cleans up the dashboard display. + """ + if self._dashboard_live is not None: + self._dashboard_live.stop() + self.console.print() # Add blank line after stopping + self._dashboard_live = None formatter = Formatter() diff --git a/tests/structs/test_concurrent_workflow.py b/tests/structs/test_concurrent_workflow.py index e3fabdd5..9cad973e 100644 --- a/tests/structs/test_concurrent_workflow.py +++ b/tests/structs/test_concurrent_workflow.py @@ -1,57 +1,130 @@ -from concurrent.futures import Future -from unittest.mock import Mock, create_autospec, patch - -from swarms.structs import Agent, ConcurrentWorkflow, Task - - -def test_add(): - workflow = ConcurrentWorkflow(max_workers=2) - task = Mock(spec=Task) - workflow.add(task) - assert task in workflow.tasks - - -def test_run(): - workflow = ConcurrentWorkflow(max_workers=2) - task1 = create_autospec(Task) - task2 = create_autospec(Task) - workflow.add(task1) - workflow.add(task2) - - with patch( - "concurrent.futures.ThreadPoolExecutor" - ) as mock_executor: - future1 = Future() - future1.set_result(None) - future2 = Future() - future2.set_result(None) - - mock_executor.return_value.__enter__.return_value.submit.side_effect = [ - future1, - future2, - ] - mock_executor.return_value.__enter__.return_value.as_completed.return_value = [ - future1, - future2, - ] - - workflow.run() - - task1.execute.assert_called_once() - task2.execute.assert_called_once() - - -def test_execute_task(): - workflow = ConcurrentWorkflow(max_workers=2) - task = create_autospec(Task) - workflow._execute_task(task) - task.execute.assert_called_once() - - -def test_agent_execution(): - workflow = ConcurrentWorkflow(max_workers=2) - agent = create_autospec(Agent) - task = Task(agent) - workflow.add(task) - workflow._execute_task(task) - agent.execute.assert_called_once() +from swarms import Agent +from swarms.structs.concurrent_workflow import ConcurrentWorkflow + + +def test_basic_workflow(): + """Test basic workflow initialization and execution""" + # Create test agents + agent1 = Agent( + agent_name="Test-Agent-1", + system_prompt="You are a test agent 1", + model_name="claude-3-sonnet-20240229", + max_loops=1, + ) + + agent2 = Agent( + agent_name="Test-Agent-2", + system_prompt="You are a test agent 2", + model_name="claude-3-sonnet-20240229", + max_loops=1, + ) + + # Create workflow + workflow = ConcurrentWorkflow( + name="test-workflow", agents=[agent1, agent2], max_loops=1 + ) + + # Run workflow + result = workflow.run("Test task") + + # Verify results + assert len(result) == 2 + assert all(isinstance(r, dict) for r in result) + assert all("agent" in r and "output" in r for r in result) + + +def test_dashboard_workflow(): + """Test workflow with dashboard enabled""" + agent = Agent( + agent_name="Dashboard-Test-Agent", + system_prompt="You are a test agent", + model_name="claude-3-sonnet-20240229", + max_loops=1, + ) + + workflow = ConcurrentWorkflow( + name="dashboard-test", + agents=[agent], + max_loops=1, + show_dashboard=True, + ) + + result = workflow.run("Test task") + + assert len(result) == 1 + assert isinstance(result[0], dict) + assert "agent" in result[0] + assert "output" in result[0] + + +def test_multiple_agents(): + """Test workflow with multiple agents""" + agents = [ + Agent( + agent_name=f"Agent-{i}", + system_prompt=f"You are test agent {i}", + model_name="claude-3-sonnet-20240229", + max_loops=1, + ) + for i in range(3) + ] + + workflow = ConcurrentWorkflow( + name="multi-agent-test", agents=agents, max_loops=1 + ) + + result = workflow.run("Multi-agent test task") + + assert len(result) == 3 + assert all(isinstance(r, dict) for r in result) + assert all("agent" in r and "output" in r for r in result) + + +def test_error_handling(): + """Test workflow error handling""" + # Create an agent that will raise an exception + agent = Agent( + agent_name="Error-Agent", + system_prompt="You are a test agent that will raise an error", + model_name="invalid-model", # This will cause an error + max_loops=1, + ) + + workflow = ConcurrentWorkflow( + name="error-test", agents=[agent], max_loops=1 + ) + + try: + workflow.run("Test task") + assert False, "Expected an error but none was raised" + except Exception as e: + assert str(e) != "" # Verify we got an error message + + +def test_max_loops(): + """Test workflow respects max_loops setting""" + agent = Agent( + agent_name="Loop-Test-Agent", + system_prompt="You are a test agent", + model_name="claude-3-sonnet-20240229", + max_loops=2, + ) + + workflow = ConcurrentWorkflow( + name="loop-test", + agents=[agent], + max_loops=1, # This should override agent's max_loops + ) + + result = workflow.run("Test task") + + assert len(result) == 1 + assert isinstance(result[0], dict) + + +if __name__ == "__main__": + test_basic_workflow() + test_dashboard_workflow() + test_multiple_agents() + test_error_handling() + test_max_loops()