[FEAT][ConcurrentWorkflow Dashboard] [ENHANCE][InteractiveGroupchat] [New Concurrentworkflow docs]

master
Kye Gomez 1 day ago
parent 1ab3eeea55
commit e3ef675fc1

@ -0,0 +1,73 @@
name: Docker Build and Publish
on:
push:
branches: [ "master" ]
# Publish semver tags as releases
tags: [ 'v*.*.*' ]
pull_request:
branches: [ "master" ]
env:
# Use docker.io for Docker Hub if empty
REGISTRY: docker.io
# github.repository as <account>/<repo>
IMAGE_NAME: ${{ github.repository }}
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
# This is used to complete the identity challenge
# with sigstore/fulcio when running outside of PRs.
id-token: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
# Setup QEMU for multi-platform builds
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
# Setup Docker BuildX
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# Login to Docker Hub
- name: Log into registry ${{ env.REGISTRY }}
if: github.event_name != 'pull_request'
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
# Extract metadata (tags, labels) for Docker
- name: Extract Docker metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
type=sha
# Build and push Docker image
- name: Build and push Docker image
id: build-and-push
uses: docker/build-push-action@v5
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
platforms: linux/amd64,linux/arm64
cache-from: type=gha
cache-to: type=gha,mode=max

1
.gitignore vendored

@ -18,6 +18,7 @@ next_swarms_update.txt
runs
Financial-Analysis-Agent_state.json
conversations/
evolved_gpt2_models/
experimental
ffn_alternatives
artifacts_five

@ -0,0 +1,65 @@
from swarms import Agent
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
# Initialize market research agent
market_researcher = Agent(
agent_name="Market-Researcher",
system_prompt="""You are a market research specialist. Your tasks include:
1. Analyzing market trends and patterns
2. Identifying market opportunities and threats
3. Evaluating competitor strategies
4. Assessing customer needs and preferences
5. Providing actionable market insights""",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
# streaming_on=True,
)
# Initialize financial analyst agent
financial_analyst = Agent(
agent_name="Financial-Analyst",
system_prompt="""You are a financial analysis expert. Your responsibilities include:
1. Analyzing financial statements
2. Evaluating investment opportunities
3. Assessing risk factors
4. Providing financial forecasts
5. Recommending financial strategies""",
model_name="claude-3-sonnet-20240229",
max_loops=1,
# streaming_on=True,
temperature=0.7,
)
# Initialize technical analyst agent
technical_analyst = Agent(
agent_name="Technical-Analyst",
system_prompt="""You are a technical analysis specialist. Your focus areas include:
1. Analyzing price patterns and trends
2. Evaluating technical indicators
3. Identifying support and resistance levels
4. Assessing market momentum
5. Providing trading recommendations""",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
# streaming_on=True,
)
# Create list of agents
agents = [market_researcher, financial_analyst, technical_analyst]
router = ConcurrentWorkflow(
name="market-analysis-router",
agents=agents,
max_loops=1,
# output_type="all",
show_dashboard=True,
)
result = router.run(
"Analyze Tesla (TSLA) stock from market, financial, and technical perspectives"
)
print(result)

@ -28,7 +28,8 @@ GROQ_API_KEY=""
### 1. Initialize Specialized Agents
```python
from swarms import Agent, ConcurrentWorkflow
from swarms import Agent
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
# Initialize market research agent
market_researcher = Agent(
@ -39,8 +40,9 @@ market_researcher = Agent(
3. Evaluating competitor strategies
4. Assessing customer needs and preferences
5. Providing actionable market insights""",
model_name="gpt-4o",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
)
# Initialize financial analyst agent
@ -52,8 +54,9 @@ financial_analyst = Agent(
3. Assessing risk factors
4. Providing financial forecasts
5. Recommending financial strategies""",
model_name="gpt-4o",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
)
# Initialize technical analyst agent
@ -65,91 +68,45 @@ technical_analyst = Agent(
3. Identifying support and resistance levels
4. Assessing market momentum
5. Providing trading recommendations""",
model_name="gpt-4o",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
)
```
### 2. Create and Run ConcurrentWorkflow
```python
# Create list of agents
agents = [market_researcher, financial_analyst, technical_analyst]
# Initialize the concurrent workflow
workflow = ConcurrentWorkflow(
name="market-analysis-workflow",
# Initialize the concurrent workflow with dashboard
router = ConcurrentWorkflow(
name="market-analysis-router",
agents=agents,
max_loops=1,
show_dashboard=True, # Enable the real-time dashboard
)
# Run the workflow
result = workflow.run(
result = router.run(
"Analyze Tesla (TSLA) stock from market, financial, and technical perspectives"
)
```
## Advanced Usage
### 1. Custom Agent Configuration
## Features
```python
from swarms import Agent, ConcurrentWorkflow
# Initialize agents with custom configurations
sentiment_analyzer = Agent(
agent_name="Sentiment-Analyzer",
system_prompt="You analyze social media sentiment...",
model_name="gpt-4o",
max_loops=1,
temperature=0.7,
streaming_on=True,
verbose=True,
)
news_analyzer = Agent(
agent_name="News-Analyzer",
system_prompt="You analyze news articles and reports...",
model_name="gpt-4o",
max_loops=1,
temperature=0.5,
streaming_on=True,
verbose=True,
)
### Real-time Dashboard
# Create and run workflow
workflow = ConcurrentWorkflow(
name="sentiment-analysis-workflow",
agents=[sentiment_analyzer, news_analyzer],
max_loops=1,
verbose=True,
)
The ConcurrentWorkflow now includes a real-time dashboard feature that can be enabled by setting `show_dashboard=True`. This provides:
result = workflow.run(
"Analyze the market sentiment for Bitcoin based on social media and news"
)
```
- Live status of each agent's execution
- Progress tracking
- Real-time output visualization
- Task completion metrics
### 2. Error Handling and Logging
### Concurrent Execution
```python
try:
workflow = ConcurrentWorkflow(
name="error-handled-workflow",
agents=agents,
max_loops=1,
verbose=True,
)
result = workflow.run("Complex analysis task")
# Process results
for agent_result in result:
print(f"Agent {agent_result['agent']}: {agent_result['output']}")
except Exception as e:
print(f"Error in workflow: {str(e)}")
```
- Multiple agents work simultaneously
- Efficient resource utilization
- Automatic task distribution
- Built-in thread management
## Best Practices
@ -164,7 +121,7 @@ except Exception as e:
- Set meaningful system prompts
3. Resource Management:
- Monitor concurrent execution
- Monitor concurrent execution through the dashboard
- Handle rate limits appropriately
- Manage memory usage
@ -178,8 +135,8 @@ except Exception as e:
Here's a complete example showing how to use ConcurrentWorkflow for a comprehensive market analysis:
```python
import os
from swarms import Agent, ConcurrentWorkflow
from swarms import Agent
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
# Initialize specialized agents
market_analyst = Agent(
@ -190,8 +147,9 @@ market_analyst = Agent(
3. Market opportunities
4. Industry dynamics
5. Growth potential""",
model_name="gpt-4o",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
)
financial_analyst = Agent(
@ -202,8 +160,9 @@ financial_analyst = Agent(
3. Cash flow analysis
4. Valuation metrics
5. Risk assessment""",
model_name="gpt-4o",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
)
risk_analyst = Agent(
@ -214,19 +173,19 @@ risk_analyst = Agent(
3. Financial risks
4. Regulatory risks
5. Strategic risks""",
model_name="gpt-4o",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
)
# Create the concurrent workflow
# Create the concurrent workflow with dashboard
workflow = ConcurrentWorkflow(
name="comprehensive-analysis-workflow",
agents=[market_analyst, financial_analyst, risk_analyst],
max_loops=1,
verbose=True,
show_dashboard=True, # Enable real-time monitoring
)
# Run the analysis
try:
result = workflow.run(
"""Provide a comprehensive analysis of Apple Inc. (AAPL) including:
@ -236,13 +195,15 @@ try:
)
# Process and display results
for agent_result in result:
print(f"\nAnalysis from {agent_result['agent']}:")
print(agent_result['output'])
print("-" * 50)
print("\nAnalysis Results:")
print("=" * 50)
for agent_output in result:
print(f"\nAnalysis from {agent_output['agent']}:")
print("-" * 40)
print(agent_output['output'])
except Exception as e:
print(f"Error during analysis: {str(e)}")
```
This comprehensive guide demonstrates how to effectively use the ConcurrentWorkflow architecture for parallel processing of complex tasks using multiple specialized agents.
This guide demonstrates how to effectively use the ConcurrentWorkflow architecture with its new dashboard feature for parallel processing of complex tasks using multiple specialized agents.

@ -1,4 +1,5 @@
import time
from swarms import Agent
# Initialize the agent
@ -38,6 +39,9 @@ agent = Agent(
model_name="claude-3-sonnet-20240229",
dynamic_temperature_enabled=True,
output_type="all",
speed_mode="fast",
streaming_on=True,
print_on=True,
# dashboard=True
)

@ -0,0 +1,55 @@
from swarms import Agent
from swarms.structs.ma_utils import create_agent_map
# Initialize market research agent
market_researcher = Agent(
agent_name="Market-Researcher",
system_prompt="""You are a market research specialist. Your tasks include:
1. Analyzing market trends and patterns
2. Identifying market opportunities and threats
3. Evaluating competitor strategies
4. Assessing customer needs and preferences
5. Providing actionable market insights""",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
# streaming_on=True,
)
# Initialize financial analyst agent
financial_analyst = Agent(
agent_name="Financial-Analyst",
system_prompt="""You are a financial analysis expert. Your responsibilities include:
1. Analyzing financial statements
2. Evaluating investment opportunities
3. Assessing risk factors
4. Providing financial forecasts
5. Recommending financial strategies""",
model_name="claude-3-sonnet-20240229",
max_loops=1,
# streaming_on=True,
temperature=0.7,
)
# Initialize technical analyst agent
technical_analyst = Agent(
agent_name="Technical-Analyst",
system_prompt="""You are a technical analysis specialist. Your focus areas include:
1. Analyzing price patterns and trends
2. Evaluating technical indicators
3. Identifying support and resistance levels
4. Assessing market momentum
5. Providing trading recommendations""",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
# streaming_on=True,
)
# Create list of agents
agents = [market_researcher, financial_analyst, technical_analyst]
out = create_agent_map(agents)
print(out)
print(out.keys())

@ -11,6 +11,7 @@ market_researcher = Agent(
5. Providing actionable market insights""",
model_name="claude-sonnet-4-20250514",
max_loops=1,
streaming_on=True,
)
# Initialize financial analyst agent
@ -24,6 +25,7 @@ financial_analyst = Agent(
5. Recommending financial strategies""",
model_name="claude-sonnet-4-20250514",
max_loops=1,
streaming_on=True,
)
# Initialize technical analyst agent
@ -37,6 +39,7 @@ technical_analyst = Agent(
5. Providing trading recommendations""",
model_name="claude-sonnet-4-20250514",
max_loops=1,
streaming_on=True,
)
# Create list of agents

@ -433,6 +433,7 @@ class Agent:
output_raw_json_from_tool_call: bool = False,
summarize_multiple_images: bool = False,
tool_retry_attempts: int = 3,
speed_mode: str = "fast",
*args,
**kwargs,
):
@ -573,6 +574,7 @@ class Agent:
)
self.summarize_multiple_images = summarize_multiple_images
self.tool_retry_attempts = tool_retry_attempts
self.speed_mode = speed_mode
# self.short_memory = self.short_memory_init()
@ -1231,6 +1233,192 @@ class Agent:
except KeyboardInterrupt as error:
self._handle_run_error(error)
def _run_fast(
self,
task: Optional[Union[str, Any]] = None,
img: Optional[str] = None,
print_task: Optional[bool] = False,
*args,
**kwargs,
) -> Any:
"""
run the agent
Args:
task (str): The task to be performed.
img (str): The image to be processed.
is_last (bool): Indicates if this is the last task.
Returns:
Any: The output of the agent.
(string, list, json, dict, yaml, xml)
Examples:
agent(task="What is the capital of France?")
agent(task="What is the capital of France?", img="path/to/image.jpg")
agent(task="What is the capital of France?", img="path/to/image.jpg", is_last=True)
"""
try:
self.short_memory.add(role=self.user_name, content=task)
# Set the loop count
loop_count = 0
# Clear the short memory
response = None
# Query the long term memory first for the context
if self.long_term_memory is not None:
self.memory_query(task)
# Print the request
if print_task is True:
formatter.print_panel(
content=f"\n User: {task}",
title=f"Task Request for {self.agent_name}",
)
while (
self.max_loops == "auto"
or loop_count < self.max_loops
):
loop_count += 1
if self.max_loops >= 2:
self.short_memory.add(
role=self.agent_name,
content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}",
)
# If it is the final loop, then add the final loop message
if loop_count >= 2 and loop_count == self.max_loops:
self.short_memory.add(
role=self.agent_name,
content=f"🎉 Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.",
)
# Dynamic temperature
if self.dynamic_temperature_enabled is True:
self.dynamic_temperature()
# Task prompt
task_prompt = (
self.short_memory.return_history_as_string()
)
# Parameters
attempt = 0
success = False
while attempt < self.retry_attempts and not success:
try:
if (
self.long_term_memory is not None
and self.rag_every_loop is True
):
logger.info(
"Querying RAG database for context..."
)
self.memory_query(task_prompt)
if img is not None:
response = self.call_llm(
task=task_prompt,
img=img,
current_loop=loop_count,
*args,
**kwargs,
)
else:
response = self.call_llm(
task=task_prompt,
current_loop=loop_count,
*args,
**kwargs,
)
# If streaming is enabled, then don't print the response
# Parse the response from the agent with the output type
if exists(self.tools_list_dictionary):
if isinstance(response, BaseModel):
response = response.model_dump()
# Parse the response from the agent with the output type
response = self.parse_llm_output(response)
self.short_memory.add(
role=self.agent_name,
content=response,
)
# Print
if self.print_on is True:
if isinstance(response, list):
self.pretty_print(
f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n {format_data_structure(response)} ",
loop_count,
)
elif self.streaming_on is True:
pass
else:
self.pretty_print(
response, loop_count
)
# Check and execute callable tools
if exists(self.tools):
self.tool_execution_retry(
response, loop_count
)
# Handle MCP tools
if (
exists(self.mcp_url)
or exists(self.mcp_config)
or exists(self.mcp_urls)
):
# Only handle MCP tools if response is not None
if response is not None:
self.mcp_tool_handling(
response=response,
current_loop=loop_count,
)
else:
logger.warning(
f"LLM returned None response in loop {loop_count}, skipping MCP tool handling"
)
success = True # Mark as successful to exit the retry loop
except Exception as e:
logger.error(
f"Attempt {attempt+1}/{self.retry_attempts}: Error generating response in loop {loop_count} for agent '{self.agent_name}': {str(e)} | "
)
attempt += 1
if not success:
logger.error(
"Failed to generate a valid response after"
" retry attempts."
)
break # Exit the loop if all retry attempts fail
# log_agent_data(self.to_dict())
# Output formatting based on output_type
return history_output_formatter(
self.short_memory, type=self.output_type
)
except Exception as error:
self._handle_run_error(error)
except KeyboardInterrupt as error:
self._handle_run_error(error)
def __handle_run_error(self, error: any):
import traceback
@ -2676,6 +2864,13 @@ class Agent:
*args,
**kwargs,
)
elif self.speed_mode == "fast":
output = self._run_fast(
task=task,
img=img,
*args,
**kwargs,
)
else:
output = self._run(
task=task,

@ -9,6 +9,7 @@ from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.formatter import formatter
logger = initialize_logger(log_folder="concurrent_workflow")
@ -32,7 +33,7 @@ class ConcurrentWorkflow(BaseSwarm):
return_str_on (bool): Flag indicating whether to return the output as a string. Defaults to False.
auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. Defaults to False.
return_entire_history (bool): Flag indicating whether to return the entire conversation history. Defaults to False.
show_dashboard (bool): Flag indicating whether to show a real-time dashboard. Defaults to True.
Raises:
ValueError: If the list of agents is empty or if the description is empty.
@ -46,6 +47,8 @@ class ConcurrentWorkflow(BaseSwarm):
output_type (str): The type of output format.
max_loops (int): The maximum number of loops for each agent.
auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents.
show_dashboard (bool): Flag indicating whether to show a real-time dashboard.
agent_statuses (dict): Dictionary to track agent statuses.
"""
def __init__(
@ -58,6 +61,7 @@ class ConcurrentWorkflow(BaseSwarm):
output_type: str = "dict-all-except-first",
max_loops: int = 1,
auto_generate_prompts: bool = False,
show_dashboard: bool = True,
*args,
**kwargs,
):
@ -76,10 +80,24 @@ class ConcurrentWorkflow(BaseSwarm):
self.max_loops = max_loops
self.auto_generate_prompts = auto_generate_prompts
self.output_type = output_type
self.show_dashboard = show_dashboard
self.agent_statuses = {
agent.agent_name: {"status": "pending", "output": ""}
for agent in agents
}
self.reliability_check()
self.conversation = Conversation()
if self.show_dashboard is True:
self.agents = self.fix_agents()
def fix_agents(self):
if self.show_dashboard is True:
for agent in self.agents:
agent.print_on = False
return self.agents
def reliability_check(self):
try:
if self.agents is None:
@ -115,7 +133,146 @@ class ConcurrentWorkflow(BaseSwarm):
for agent in self.agents:
agent.auto_generate_prompt = True
def run(
def display_agent_dashboard(
self,
title: str = "🤖 Agent Dashboard",
is_final: bool = False,
) -> None:
"""
Displays the current status of all agents in a beautiful dashboard format.
Args:
title (str): The title of the dashboard.
is_final (bool): Flag indicating whether this is the final dashboard.
"""
agents_data = [
{
"name": agent.agent_name,
"status": self.agent_statuses[agent.agent_name][
"status"
],
"output": self.agent_statuses[agent.agent_name][
"output"
],
}
for agent in self.agents
]
formatter.print_agent_dashboard(agents_data, title, is_final)
def run_with_dashboard(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
):
"""
Executes all agents in the workflow concurrently on the given task.
Now includes real-time dashboard updates.
"""
try:
self.conversation.add(role="User", content=task)
# Reset agent statuses
for agent in self.agents:
self.agent_statuses[agent.agent_name] = {
"status": "pending",
"output": "",
}
# Display initial dashboard if enabled
if self.show_dashboard:
self.display_agent_dashboard()
# Use 95% of available CPU cores for optimal performance
max_workers = int(os.cpu_count() * 0.95)
# Create a list to store all futures and their results
futures = []
results = []
def run_agent_with_status(agent, task, img, imgs):
try:
# Update status to running
self.agent_statuses[agent.agent_name][
"status"
] = "running"
if self.show_dashboard:
self.display_agent_dashboard()
# Run the agent
output = agent.run(task=task, img=img, imgs=imgs)
# Update status to completed
self.agent_statuses[agent.agent_name][
"status"
] = "completed"
self.agent_statuses[agent.agent_name][
"output"
] = output
if self.show_dashboard:
self.display_agent_dashboard()
return output
except Exception as e:
# Update status to error
self.agent_statuses[agent.agent_name][
"status"
] = "error"
self.agent_statuses[agent.agent_name][
"output"
] = f"Error: {str(e)}"
if self.show_dashboard:
self.display_agent_dashboard()
raise
# Run agents concurrently using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
# Submit all agent tasks
futures = [
executor.submit(
run_agent_with_status, agent, task, img, imgs
)
for agent in self.agents
]
# Wait for all futures to complete
concurrent.futures.wait(futures)
# Process results in order of completion
for future, agent in zip(futures, self.agents):
try:
output = future.result()
results.append((agent.agent_name, output))
except Exception as e:
logger.error(
f"Agent {agent.agent_name} failed: {str(e)}"
)
results.append(
(agent.agent_name, f"Error: {str(e)}")
)
# Add all results to conversation
for agent_name, output in results:
self.conversation.add(role=agent_name, content=output)
# Display final dashboard if enabled
if self.show_dashboard:
self.display_agent_dashboard(
"🎉 Final Agent Dashboard", is_final=True
)
return history_output_formatter(
conversation=self.conversation,
type=self.output_type,
)
finally:
# Always clean up the dashboard display
if self.show_dashboard:
formatter.stop_dashboard()
def _run(
self,
task: str,
img: Optional[str] = None,
@ -167,6 +324,20 @@ class ConcurrentWorkflow(BaseSwarm):
type=self.output_type,
)
def run(
self,
task: str,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
):
"""
Executes all agents in the workflow concurrently on the given task.
"""
if self.show_dashboard:
return self.run_with_dashboard(task, img, imgs)
else:
return self._run(task, img, imgs)
def batch_run(
self,
tasks: List[str],

@ -6,6 +6,7 @@ from loguru import logger
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.ma_utils import create_agent_map
from swarms.utils.generate_keys import generate_api_key
from swarms.utils.history_output_formatter import (
history_output_formatter,
@ -24,12 +25,6 @@ class AgentNotFoundError(InteractiveGroupChatError):
pass
class NoMentionedAgentsError(InteractiveGroupChatError):
"""Raised when no agents are mentioned in the task"""
pass
class InvalidTaskFormatError(InteractiveGroupChatError):
"""Raised when the task format is invalid"""
@ -294,14 +289,7 @@ class InteractiveGroupChat:
# Initialize conversation history
self.conversation = Conversation(time_enabled=True)
# Create a mapping of agent names to agents for easy lookup
self.agent_map = {}
for agent in agents:
if isinstance(agent, Agent):
self.agent_map[agent.agent_name] = agent
elif callable(agent):
# For callable functions, use the function name as the agent name
self.agent_map[agent.__name__] = agent
self.agent_map = create_agent_map(self.agents)
self._validate_initialization()
self._setup_conversation_context()
@ -398,7 +386,7 @@ class InteractiveGroupChat:
Start an interactive terminal session for chatting with agents.
This method creates a REPL (Read-Eval-Print Loop) that allows users to:
- Chat with agents using @mentions
- Chat with agents using @mentions (optional)
- See available agents and their descriptions
- Exit the session using 'exit' or 'quit'
- Get help using 'help' or '?'
@ -426,7 +414,9 @@ class InteractiveGroupChat:
print("- Type 'help' or '?' for help")
print("- Type 'exit' or 'quit' to end the session")
print("- Type 'speaker' to change speaker function")
print("- Use @agent_name to mention agents")
print(
"- Use @agent_name to mention specific agents (optional)"
)
print("\nStart chatting:")
while True:
@ -441,9 +431,11 @@ class InteractiveGroupChat:
if user_input.lower() in ["help", "?"]:
print("\nHelp:")
print("1. Mention agents using @agent_name")
print(
"2. You can mention multiple agents in one task"
"1. You can mention specific agents using @agent_name (optional)"
)
print(
"2. If no agents are mentioned, they will be selected automatically"
)
print("3. Available agents:")
for name in self.agent_map:
@ -513,10 +505,6 @@ class InteractiveGroupChat:
print("\nChat:")
# print(response)
except NoMentionedAgentsError:
print(
"\nError: Please mention at least one agent using @agent_name"
)
except AgentNotFoundError as e:
print(f"\nError: {str(e)}")
except Exception as e:
@ -699,13 +687,13 @@ Remember: You are part of a team. Your response should reflect that you've read,
def _extract_mentions(self, task: str) -> List[str]:
"""
Extracts @mentions from the task.
Extracts @mentions from the task. If no mentions are found, returns all available agents.
Args:
task (str): The input task
Returns:
List[str]: List of mentioned agent names
List[str]: List of mentioned agent names or all agent names if no mentions
Raises:
InvalidtaskFormatError: If the task format is invalid
@ -713,11 +701,17 @@ Remember: You are part of a team. Your response should reflect that you've read,
try:
# Find all @mentions using regex
mentions = re.findall(r"@(\w+)", task)
return [
valid_mentions = [
mention
for mention in mentions
if mention in self.agent_map
]
# If no valid mentions found, return all available agents
if not valid_mentions:
return list(self.agent_map.keys())
return valid_mentions
except Exception as e:
logger.error(f"Error extracting mentions: {e}")
raise InvalidTaskFormatError(f"Invalid task format: {e}")
@ -810,6 +804,149 @@ Remember: You are part of a team. Your response should reflect that you've read,
# Fallback to original order
return mentioned_agents
def _process_dynamic_speakers(
self,
mentioned_agents: List[str],
img: Optional[str],
imgs: Optional[List[str]],
) -> None:
"""
Process responses using the dynamic speaker function.
"""
# Get strategy from speaker state (default to sequential)
strategy = self.speaker_state.get("strategy", "sequential")
# Track which agents have spoken to ensure all get a chance
spoken_agents = set()
last_response = ""
max_iterations = (
len(mentioned_agents) * 3
) # Allow more iterations for parallel
iteration = 0
while iteration < max_iterations and len(spoken_agents) < len(
mentioned_agents
):
# Determine next speaker(s) using dynamic function
next_speakers = self.speaker_function(
mentioned_agents,
last_response,
strategy=strategy,
**self.speaker_state,
)
# Handle both single agent and multiple agents
if isinstance(next_speakers, str):
next_speakers = [next_speakers]
# Filter out invalid agents
valid_next_speakers = [
agent
for agent in next_speakers
if agent in mentioned_agents
]
if not valid_next_speakers:
# If no valid mentions found, randomly select from unspoken agents
unspoken_agents = [
agent
for agent in mentioned_agents
if agent not in spoken_agents
]
if unspoken_agents:
valid_next_speakers = [
random.choice(unspoken_agents)
]
else:
# All agents have spoken, break the loop
break
# Process agents based on strategy
if strategy == "sequential":
self._process_sequential_speakers(
valid_next_speakers, spoken_agents, img, imgs
)
elif strategy == "parallel":
self._process_parallel_speakers(
valid_next_speakers, spoken_agents, img, imgs
)
iteration += 1
def _process_sequential_speakers(
self,
speakers: List[str],
spoken_agents: set,
img: Optional[str],
imgs: Optional[List[str]],
) -> None:
"""
Process speakers sequentially.
"""
for next_speaker in speakers:
if next_speaker in spoken_agents:
continue # Skip if already spoken
response = self._get_agent_response(
next_speaker, img, imgs
)
if response:
spoken_agents.add(next_speaker)
break # Only process one agent in sequential mode
def _process_parallel_speakers(
self,
speakers: List[str],
spoken_agents: set,
img: Optional[str],
imgs: Optional[List[str]],
) -> None:
"""
Process speakers in parallel.
"""
import concurrent.futures
# Get responses from all valid agents
responses = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_agent = {
executor.submit(
self._get_agent_response, agent, img, imgs
): agent
for agent in speakers
if agent not in spoken_agents
}
for future in concurrent.futures.as_completed(
future_to_agent
):
agent = future_to_agent[future]
try:
response = future.result()
if response:
responses.append(response)
spoken_agents.add(agent)
except Exception as e:
logger.error(
f"Error getting response from {agent}: {e}"
)
def _process_static_speakers(
self,
mentioned_agents: List[str],
img: Optional[str],
imgs: Optional[List[str]],
) -> None:
"""
Process responses using a static speaker function.
"""
speaking_order = self._get_speaking_order(mentioned_agents)
logger.info(f"Speaking order determined: {speaking_order}")
# Get responses from mentioned agents in the determined order
for agent_name in speaking_order:
self._get_agent_response(agent_name, img, imgs)
def run(
self,
task: str,
@ -817,151 +954,33 @@ Remember: You are part of a team. Your response should reflect that you've read,
imgs: Optional[List[str]] = None,
) -> str:
"""
Process a task and get responses from mentioned agents.
If interactive mode is enabled, this will be called by start_interactive_session().
Otherwise, it can be called directly for single task processing.
Process a task and get responses from agents. If no agents are mentioned,
randomly selects agents to participate.
"""
try:
# Extract mentioned agents
mentioned_agents = self._extract_mentions(task)
if not mentioned_agents:
raise NoMentionedAgentsError(
"No valid agents mentioned in the task"
)
# Extract mentioned agents (or all agents if none mentioned)
if "@" in task:
mentioned_agents = self._extract_mentions(task)
else:
pass
# Add user task to conversation
self.conversation.add(role="User", content=task)
# Handle dynamic speaker function differently
# Process responses based on speaker function type
if self.speaker_function == random_dynamic_speaker:
# Get strategy from speaker state (default to sequential)
strategy = self.speaker_state.get(
"strategy", "sequential"
self._process_dynamic_speakers(
mentioned_agents, img, imgs
)
# For dynamic speaker, we'll determine the next speaker after each response
# Track which agents have spoken to ensure all get a chance
spoken_agents = set()
last_response = ""
max_iterations = (
len(mentioned_agents) * 3
) # Allow more iterations for parallel
iteration = 0
while iteration < max_iterations and len(
spoken_agents
) < len(mentioned_agents):
# Determine next speaker(s) using dynamic function
next_speakers = self.speaker_function(
mentioned_agents, # Use all mentioned agents, not remaining_agents
last_response,
strategy=strategy,
**self.speaker_state,
)
# Handle both single agent and multiple agents
if isinstance(next_speakers, str):
next_speakers = [next_speakers]
# Filter out invalid agents
valid_next_speakers = [
agent
for agent in next_speakers
if agent in mentioned_agents
]
if not valid_next_speakers:
# If no valid mentions found, randomly select from unspoken agents
unspoken_agents = [
agent
for agent in mentioned_agents
if agent not in spoken_agents
]
if unspoken_agents:
valid_next_speakers = [
random.choice(unspoken_agents)
]
else:
# All agents have spoken, break the loop
break
# Process agents based on strategy
if strategy == "sequential":
# Process one agent at a time
for next_speaker in valid_next_speakers:
if next_speaker in spoken_agents:
continue # Skip if already spoken
response = self._get_agent_response(
next_speaker, img, imgs
)
if response:
last_response = response
spoken_agents.add(next_speaker)
break # Only process one agent in sequential mode
elif strategy == "parallel":
# Process all mentioned agents in parallel
import concurrent.futures
# Get responses from all valid agents
responses = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_agent = {
executor.submit(
self._get_agent_response,
agent,
img,
imgs,
): agent
for agent in valid_next_speakers
if agent not in spoken_agents
}
for (
future
) in concurrent.futures.as_completed(
future_to_agent
):
agent = future_to_agent[future]
try:
response = future.result()
if response:
responses.append(response)
spoken_agents.add(agent)
except Exception as e:
logger.error(
f"Error getting response from {agent}: {e}"
)
# Combine responses for next iteration
if responses:
last_response = "\n\n".join(responses)
iteration += 1
else:
# For non-dynamic speaker functions, use the original logic
speaking_order = self._get_speaking_order(
mentioned_agents
self._process_static_speakers(
mentioned_agents, img, imgs
)
logger.info(
f"Speaking order determined: {speaking_order}"
)
# Get responses from mentioned agents in the determined order
for agent_name in speaking_order:
response = self._get_agent_response(
agent_name, img, imgs
)
return history_output_formatter(
self.conversation, self.output_type
)
except InteractiveGroupChatError as e:
logger.error(f"GroupChat error: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise InteractiveGroupChatError(

@ -1,8 +1,11 @@
from typing import List, Any, Optional, Union, Callable
from typing import Dict, List, Any, Optional, Union, Callable
import random
from swarms.prompts.collaborative_prompts import (
get_multi_agent_collaboration_prompt_one,
)
from functools import lru_cache
from loguru import logger
def list_all_agents(
@ -116,3 +119,65 @@ def set_random_models_for_agents(
else:
setattr(agents, "model_name", random.choice(model_names))
return agents
@lru_cache(maxsize=128)
def _create_agent_map_cached(
agent_tuple: tuple,
) -> Dict[str, Union[Callable, Any]]:
"""Internal cached version of create_agent_map that takes a tuple for hashability."""
try:
return {
(
agent.agent_name
if isinstance(agent, Callable)
else agent.__name__
): agent
for agent in agent_tuple
}
except (AttributeError, TypeError) as e:
logger.error(f"Error creating agent map: {e}")
return {}
def create_agent_map(
agents: List[Union[Callable, Any]],
) -> Dict[str, Union[Callable, Any]]:
"""Creates a map of agent names to agents for fast lookup.
This function is optimized with LRU caching to avoid recreating maps for identical agent lists.
The cache stores up to 128 different agent map configurations.
Args:
agents (List[Union[Callable, Any]]): List of agents to create a map of. Each agent should either be:
- A callable with a __name__ attribute
- An object with an agent_name attribute
Returns:
Dict[str, Union[Callable, Any]]: Map of agent names to agents
Examples:
>>> def agent1(): pass
>>> def agent2(): pass
>>> agents = [agent1, agent2]
>>> agent_map = create_agent_map(agents)
>>> print(agent_map.keys())
dict_keys(['agent1', 'agent2'])
>>> class Agent:
... def __init__(self, name):
... self.agent_name = name
>>> agents = [Agent("bot1"), Agent("bot2")]
>>> agent_map = create_agent_map(agents)
>>> print(agent_map.keys())
dict_keys(['bot1', 'bot2'])
Raises:
ValueError: If agents list is empty
TypeError: If any agent lacks required name attributes
"""
if not agents:
raise ValueError("Agents list cannot be empty")
# Convert list to tuple for hashability
return _create_agent_map_cached(tuple(agents))

@ -5,9 +5,27 @@ from typing import Any, Callable, Dict, List, Optional
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
)
from rich.table import Table
from rich.text import Text
from rich.spinner import Spinner
# Global lock to ensure only a single Rich Live context is active at any moment.
# Rich's Live render is **not** thread-safe; concurrent Live contexts on the same
# console raise runtime errors. Using a module-level lock serialises access and
# prevents crashes when multiple agents stream simultaneously in different
# threads (e.g., in ConcurrentWorkflow).
live_render_lock = threading.Lock()
# Global Live display for the dashboard
dashboard_live = None
# Create a spinner for loading animation
spinner = Spinner("dots", style="yellow")
def choose_random_color():
@ -37,6 +55,53 @@ class Formatter:
Initializes the Formatter with a Rich Console instance.
"""
self.console = Console()
self._dashboard_live = None
self._spinner_frames = [
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
]
self._spinner_idx = 0
def _get_status_with_loading(self, status: str) -> Text:
"""
Creates a status text with loading animation for running status.
"""
if status.lower() == "running":
# Create loading bar effect
self._spinner_idx = (self._spinner_idx + 1) % len(
self._spinner_frames
)
spinner_char = self._spinner_frames[self._spinner_idx]
progress_bar = "" * (self._spinner_idx % 5) + "" * (
4 - (self._spinner_idx % 5)
)
return Text(
f"{spinner_char} {status} {progress_bar}",
style="bold yellow",
)
# Style other statuses
status_style = {
"completed": "bold green",
"pending": "bold red",
"error": "bold red",
}.get(status.lower(), "white")
status_symbol = {
"completed": "",
"pending": "",
"error": "",
}.get(status.lower(), "")
return Text(f"{status_symbol} {status}", style=status_style)
def _print_panel(
self, content: str, title: str = "", style: str = "bold blue"
@ -209,58 +274,155 @@ class Formatter:
complete_response = ""
chunks_collected = []
# TRUE streaming with Rich's automatic text wrapping
with Live(
create_streaming_panel(streaming_text),
console=self.console,
refresh_per_second=20,
) as live:
try:
for part in streaming_response:
if (
hasattr(part, "choices")
and part.choices
and part.choices[0].delta.content
):
# Add ONLY the new chunk to the Text object with random color style
chunk = part.choices[0].delta.content
streaming_text.append(chunk, style=text_style)
complete_response += chunk
# Collect chunks if requested
if collect_chunks:
chunks_collected.append(chunk)
# Call chunk callback if provided
if on_chunk_callback:
on_chunk_callback(chunk)
# Update display with new text - Rich handles all wrapping automatically
live.update(
create_streaming_panel(
streaming_text, is_complete=False
# Acquire the lock so that only one Live panel is active at a time.
# Other threads will wait here until the current streaming completes,
# avoiding Rich.Live concurrency errors.
with live_render_lock:
# TRUE streaming with Rich's automatic text wrapping
with Live(
create_streaming_panel(streaming_text),
console=self.console,
refresh_per_second=20,
) as live:
try:
for part in streaming_response:
if (
hasattr(part, "choices")
and part.choices
and part.choices[0].delta.content
):
# Add ONLY the new chunk to the Text object with random color style
chunk = part.choices[0].delta.content
streaming_text.append(
chunk, style=text_style
)
complete_response += chunk
# Collect chunks if requested
if collect_chunks:
chunks_collected.append(chunk)
# Call chunk callback if provided
if on_chunk_callback:
on_chunk_callback(chunk)
# Update display with new text - Rich handles all wrapping automatically
live.update(
create_streaming_panel(
streaming_text, is_complete=False
)
)
# Final update to show completion
live.update(
create_streaming_panel(
streaming_text, is_complete=True
)
)
# Final update to show completion
live.update(
create_streaming_panel(
streaming_text, is_complete=True
except Exception as e:
# Handle any streaming errors gracefully
streaming_text.append(
f"\n[Error: {str(e)}]", style="bold red"
)
)
live.update(
create_streaming_panel(
streaming_text, is_complete=True
)
)
return complete_response
def _create_dashboard_table(
self, agents_data: List[Dict[str, Any]], title: str
) -> Panel:
"""
Creates the dashboard table with the current agent statuses.
"""
# Create main table
table = Table(
show_header=True,
header_style="bold magenta",
expand=True,
title=title,
title_style="bold cyan",
border_style="bright_blue",
show_lines=True, # Add lines between rows
)
# Add columns with adjusted widths
table.add_column(
"Agent Name", style="cyan", width=30, no_wrap=True
)
table.add_column(
"Status", style="green", width=20, no_wrap=True
) # Increased width for loading animation
table.add_column(
"Output", style="white", width=100, overflow="fold"
) # Allow text to wrap
# Add rows for each agent
for agent in agents_data:
name = Text(agent["name"], style="bold cyan")
status = self._get_status_with_loading(agent["status"])
output = Text(str(agent["output"]))
table.add_row(name, status, output)
# Create a panel to wrap the table
dashboard_panel = Panel(
table,
border_style="bright_blue",
padding=(1, 2),
title=f"[bold cyan]{title}[/bold cyan] - Total Agents: [bold green]{len(agents_data)}[/bold green]",
expand=True, # Make panel expand to full width
)
return dashboard_panel
except Exception as e:
# Handle any streaming errors gracefully
streaming_text.append(
f"\n[Error: {str(e)}]", style="bold red"
def print_agent_dashboard(
self,
agents_data: List[Dict[str, Any]],
title: str = "🤖 Agent Dashboard",
is_final: bool = False,
) -> None:
"""
Displays a beautiful dashboard showing agent information in a panel-like spreadsheet format.
Updates in place instead of printing multiple times.
Args:
agents_data (List[Dict[str, Any]]): List of dictionaries containing agent information.
Each dict should have: name, status, output
title (str): The title of the dashboard.
is_final (bool): Whether this is the final update of the dashboard.
"""
with live_render_lock:
if self._dashboard_live is None:
# Create new Live display if none exists
self._dashboard_live = Live(
self._create_dashboard_table(agents_data, title),
console=self.console,
refresh_per_second=10, # Increased refresh rate
transient=False, # Make display persistent
)
live.update(
create_streaming_panel(
streaming_text, is_complete=True
)
self._dashboard_live.start()
else:
# Update existing Live display
self._dashboard_live.update(
self._create_dashboard_table(agents_data, title)
)
return complete_response
# If this is the final update, add a newline to separate from future output
if is_final:
self.console.print() # Add blank line after final display
def stop_dashboard(self):
"""
Stops and cleans up the dashboard display.
"""
if self._dashboard_live is not None:
self._dashboard_live.stop()
self.console.print() # Add blank line after stopping
self._dashboard_live = None
formatter = Formatter()

@ -1,57 +1,130 @@
from concurrent.futures import Future
from unittest.mock import Mock, create_autospec, patch
from swarms.structs import Agent, ConcurrentWorkflow, Task
def test_add():
workflow = ConcurrentWorkflow(max_workers=2)
task = Mock(spec=Task)
workflow.add(task)
assert task in workflow.tasks
def test_run():
workflow = ConcurrentWorkflow(max_workers=2)
task1 = create_autospec(Task)
task2 = create_autospec(Task)
workflow.add(task1)
workflow.add(task2)
with patch(
"concurrent.futures.ThreadPoolExecutor"
) as mock_executor:
future1 = Future()
future1.set_result(None)
future2 = Future()
future2.set_result(None)
mock_executor.return_value.__enter__.return_value.submit.side_effect = [
future1,
future2,
]
mock_executor.return_value.__enter__.return_value.as_completed.return_value = [
future1,
future2,
]
workflow.run()
task1.execute.assert_called_once()
task2.execute.assert_called_once()
def test_execute_task():
workflow = ConcurrentWorkflow(max_workers=2)
task = create_autospec(Task)
workflow._execute_task(task)
task.execute.assert_called_once()
def test_agent_execution():
workflow = ConcurrentWorkflow(max_workers=2)
agent = create_autospec(Agent)
task = Task(agent)
workflow.add(task)
workflow._execute_task(task)
agent.execute.assert_called_once()
from swarms import Agent
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
def test_basic_workflow():
"""Test basic workflow initialization and execution"""
# Create test agents
agent1 = Agent(
agent_name="Test-Agent-1",
system_prompt="You are a test agent 1",
model_name="claude-3-sonnet-20240229",
max_loops=1,
)
agent2 = Agent(
agent_name="Test-Agent-2",
system_prompt="You are a test agent 2",
model_name="claude-3-sonnet-20240229",
max_loops=1,
)
# Create workflow
workflow = ConcurrentWorkflow(
name="test-workflow", agents=[agent1, agent2], max_loops=1
)
# Run workflow
result = workflow.run("Test task")
# Verify results
assert len(result) == 2
assert all(isinstance(r, dict) for r in result)
assert all("agent" in r and "output" in r for r in result)
def test_dashboard_workflow():
"""Test workflow with dashboard enabled"""
agent = Agent(
agent_name="Dashboard-Test-Agent",
system_prompt="You are a test agent",
model_name="claude-3-sonnet-20240229",
max_loops=1,
)
workflow = ConcurrentWorkflow(
name="dashboard-test",
agents=[agent],
max_loops=1,
show_dashboard=True,
)
result = workflow.run("Test task")
assert len(result) == 1
assert isinstance(result[0], dict)
assert "agent" in result[0]
assert "output" in result[0]
def test_multiple_agents():
"""Test workflow with multiple agents"""
agents = [
Agent(
agent_name=f"Agent-{i}",
system_prompt=f"You are test agent {i}",
model_name="claude-3-sonnet-20240229",
max_loops=1,
)
for i in range(3)
]
workflow = ConcurrentWorkflow(
name="multi-agent-test", agents=agents, max_loops=1
)
result = workflow.run("Multi-agent test task")
assert len(result) == 3
assert all(isinstance(r, dict) for r in result)
assert all("agent" in r and "output" in r for r in result)
def test_error_handling():
"""Test workflow error handling"""
# Create an agent that will raise an exception
agent = Agent(
agent_name="Error-Agent",
system_prompt="You are a test agent that will raise an error",
model_name="invalid-model", # This will cause an error
max_loops=1,
)
workflow = ConcurrentWorkflow(
name="error-test", agents=[agent], max_loops=1
)
try:
workflow.run("Test task")
assert False, "Expected an error but none was raised"
except Exception as e:
assert str(e) != "" # Verify we got an error message
def test_max_loops():
"""Test workflow respects max_loops setting"""
agent = Agent(
agent_name="Loop-Test-Agent",
system_prompt="You are a test agent",
model_name="claude-3-sonnet-20240229",
max_loops=2,
)
workflow = ConcurrentWorkflow(
name="loop-test",
agents=[agent],
max_loops=1, # This should override agent's max_loops
)
result = workflow.run("Test task")
assert len(result) == 1
assert isinstance(result[0], dict)
if __name__ == "__main__":
test_basic_workflow()
test_dashboard_workflow()
test_multiple_agents()
test_error_handling()
test_max_loops()

Loading…
Cancel
Save