diff --git a/docs/swarms/structs/swarm_router.md b/docs/swarms/structs/swarm_router.md index 2a5deeb1..8ccf1203 100644 --- a/docs/swarms/structs/swarm_router.md +++ b/docs/swarms/structs/swarm_router.md @@ -4,13 +4,13 @@ The `SwarmRouter` class is a flexible routing system designed to manage differen Full Path: `from swarms.structs.swarm_router` - ## Initialization Parameters Main class for routing tasks to different swarm types. | Attribute | Type | Description | | --- | --- | --- | +| `id` | str | Unique identifier for the SwarmRouter instance (auto-generated if not provided) | | `name` | str | Name of the SwarmRouter instance | | `description` | str | Description of the SwarmRouter's purpose | | `max_loops` | int | Maximum number of loops to perform | @@ -24,35 +24,84 @@ Main class for routing tasks to different swarm types. | `rules` | str | Rules to inject into every agent | | `documents` | List[str] | List of document file paths | | `output_type` | OutputType | Output format type (e.g., "string", "dict", "list", "json", "yaml", "xml") | -| `no_cluster_ops` | bool | Flag to disable cluster operations | | `speaker_fn` | callable | Speaker function for GroupChat swarm type | | `load_agents_from_csv` | bool | Flag to enable/disable loading agents from CSV | | `csv_file_path` | str | Path to the CSV file for loading agents | | `return_entire_history` | bool | Flag to enable/disable returning the entire conversation history | | `multi_agent_collab_prompt` | bool | Whether to enable multi-agent collaboration prompts | +| `list_all_agents` | bool | Flag to enable/disable listing all agents to each other | +| `conversation` | Any | Conversation object for managing agent interactions | +| `agents_config` | Optional[Dict[Any, Any]] | Configuration dictionary for agents | +| `speaker_function` | str | Speaker function name for InteractiveGroupChat swarm type | +| `heavy_swarm_loops_per_agent` | int | Number of loops per agent for HeavySwarm (default: 1) | +| `heavy_swarm_question_agent_model_name` | str | Model name for the question agent in HeavySwarm (default: "gpt-4.1") | +| `heavy_swarm_worker_model_name` | str | Model name for worker agents in HeavySwarm (default: "gpt-4.1") | +| `heavy_swarm_swarm_show_output` | bool | Flag to show output for HeavySwarm (default: True) | +| `telemetry_enabled` | bool | Flag to enable/disable telemetry logging (default: False) | +| `council_judge_model_name` | str | Model name for the judge in CouncilAsAJudge (default: "gpt-4o-mini") | +| `verbose` | bool | Flag to enable/disable verbose logging (default: False) | +| `worker_tools` | List[Callable] | List of tools available to worker agents | +| `aggregation_strategy` | str | Aggregation strategy for HeavySwarm (default: "synthesis") | -#### Methods: +### Methods -| Method | Parameters | Description | -| --- | --- | --- | -| `__init__` | `name: str = "swarm-router", description: str = "Routes your task to the desired swarm", max_loops: int = 1, agents: List[Union[Agent, Callable]] = [], swarm_type: SwarmType = "SequentialWorkflow", autosave: bool = False, rearrange_flow: str = None, return_json: bool = False, auto_generate_prompts: bool = False, shared_memory_system: Any = None, rules: str = None, documents: List[str] = [], output_type: OutputType = "dict", no_cluster_ops: bool = False, speaker_fn: callable = None, load_agents_from_csv: bool = False, csv_file_path: str = None, return_entire_history: bool = True, multi_agent_collab_prompt: bool = True` | Initialize the SwarmRouter | -| `setup` | None | Set up the SwarmRouter by activating APE and handling shared memory and rules | -| `activate_shared_memory` | None | Activate shared memory with all agents | -| `handle_rules` | None | Inject rules to every agent | -| `activate_ape` | None | Activate automatic prompt engineering for agents that support it | -| `reliability_check` | None | Perform reliability checks on the SwarmRouter configuration | -| `_create_swarm` | `task: str = None, *args, **kwargs` | Create and return the specified swarm type | -| `update_system_prompt_for_agent_in_swarm` | None | Update system prompts for all agents with collaboration prompts | -| `_log` | `level: str, message: str, task: str = "", metadata: Dict[str, Any] = None` | Create a log entry | -| `_run` | `task: str, img: Optional[str] = None, model_response: Optional[str] = None, *args, **kwargs` | Run the specified task on the selected swarm type | -| `run` | `task: str, img: Optional[str] = None, model_response: Optional[str] = None, *args, **kwargs` | Execute a task on the selected swarm type | -| `__call__` | `task: str, *args, **kwargs` | Make the SwarmRouter instance callable | -| `batch_run` | `tasks: List[str], *args, **kwargs` | Execute multiple tasks in sequence | -| `async_run` | `task: str, *args, **kwargs` | Execute a task asynchronously | -| `get_logs` | None | Retrieve all logged entries | -| `concurrent_run` | `task: str, *args, **kwargs` | Execute a task using concurrent execution | -| `concurrent_batch_run` | `tasks: List[str], *args, **kwargs` | Execute multiple tasks concurrently | +#### `run()` + +Execute a task on the selected swarm type. + +**Input Parameters:** + +| Parameter | Type | Required | Default | Description | +| --- | --- | --- | --- | --- | +| `task` | `Optional[str]` | No | `None` | The task to be executed by the swarm | +| `img` | `Optional[str]` | No | `None` | Path to an image file for vision tasks | +| `tasks` | `Optional[List[str]]` | No | `None` | List of tasks (used for BatchedGridWorkflow) | +| `*args` | `Any` | No | - | Variable length argument list | +| `**kwargs` | `Any` | No | - | Arbitrary keyword arguments | + +**Output:** + +| Type | Description | +| --- | --- | +| `Any` | The result of the swarm's execution. The exact type depends on the `output_type` configuration (e.g., `str`, `dict`, `list`, `json`, `yaml`, `xml`) | + +**Example:** + +```python +result = router.run( + task="Analyze the market trends and provide recommendations", + img="chart.png" # Optional +) +``` + +--- + +### `batch_run()` + +Execute multiple tasks in sequence on the selected swarm type. + +**Input Parameters:** +| Parameter | Type | Required | Default | Description | +| --- | --- | --- | --- | --- | +| `tasks` | `List[str]` | Yes | - | List of tasks to be executed sequentially | +| `img` | `Optional[str]` | No | `None` | Path to an image file for vision tasks | +| `imgs` | `Optional[List[str]]` | No | `None` | List of image file paths for vision tasks | +| `*args` | `Any` | No | - | Variable length argument list | +| `**kwargs` | `Any` | No | - | Arbitrary keyword arguments | + +**Output:** + +| Type | Description | +| --- | --- | +| `List[Any]` | A list of results from the swarm's execution, one result per task. Each result type depends on the `output_type` configuration | + +**Example:** + +```python +tasks = ["Analyze Q1 report", "Summarize competitor landscape", "Evaluate market trends"] +results = router.batch_run(tasks, img="report.png") # Optional img parameter +``` ## Available Swarm Types @@ -62,7 +111,6 @@ The `SwarmRouter` supports many various multi-agent architectures for various ap |------------|-------------| | `AgentRearrange` | Optimizes agent arrangement for task execution | | `MixtureOfAgents` | Combines multiple agent types for diverse tasks | -| `SpreadSheetSwarm` | Uses spreadsheet-like operations for task management | | `SequentialWorkflow` | Executes tasks sequentially | | `ConcurrentWorkflow` | Executes tasks in parallel | | `GroupChat` | Facilitates communication among agents in a group chat format | @@ -73,10 +121,10 @@ The `SwarmRouter` supports many various multi-agent architectures for various ap | `MALT` | Multi-Agent Language Tasks | | `CouncilAsAJudge` | Council-based judgment system | | `InteractiveGroupChat` | Interactive group chat with user participation | +| `HeavySwarm` | Heavy swarm architecture with question and worker agents | +| `BatchedGridWorkflow` | Batched grid workflow for parallel task processing | | `auto` | Automatically selects best swarm type via embedding search | - - ## Basic Usage ```python @@ -129,9 +177,13 @@ router = SwarmRouter( if __name__ == "__main__": # Run a comprehensive private equity document analysis task result = router.run( - "Where is the best place to find template term sheets for series A startups? Provide links and references" + task="Where is the best place to find template term sheets for series A startups? Provide links and references", + img=None # Optional: provide image path for vision tasks ) print(result) + + # For BatchedGridWorkflow, you can pass multiple tasks: + # result = router.run(tasks=["Task 1", "Task 2", "Task 3"]) ``` ## Advanced Usage @@ -225,22 +277,6 @@ mixture_router = SwarmRouter( result = mixture_router.run("Evaluate the potential acquisition of TechStartup Inc.") ``` -### SpreadSheetSwarm - -Use Case: Collaborative data processing and analysis. - -```python -spreadsheet_router = SwarmRouter( - name="DataProcessor", - description="Collaborative data processing and analysis", - max_loops=1, - agents=[data_cleaner, statistical_analyzer, visualizer], - swarm_type="SpreadSheetSwarm" -) - -result = spreadsheet_router.run("Process and visualize customer churn data") -``` - ### SequentialWorkflow Use Case: Step-by-step document analysis and report generation. @@ -379,6 +415,47 @@ result = interactive_chat_router.run("Discuss the market trends and provide inte The InteractiveGroupChat allows for dynamic interaction between agents and users, enabling real-time participation in group discussions and decision-making processes. This is particularly useful for scenarios requiring human input or validation during the conversation flow. +### HeavySwarm + +Use Case: Complex task decomposition with question and worker agents. + +```python +heavy_swarm_router = SwarmRouter( + name="HeavySwarm", + description="Complex task decomposition and execution", + swarm_type="HeavySwarm", + heavy_swarm_loops_per_agent=2, + heavy_swarm_question_agent_model_name="gpt-4.1", + heavy_swarm_worker_model_name="gpt-4.1", + heavy_swarm_swarm_show_output=True, + worker_tools=[tool1, tool2], + aggregation_strategy="synthesis", + output_type="string" +) + +result = heavy_swarm_router.run("Analyze market trends and provide comprehensive recommendations") +``` + +HeavySwarm uses a question agent to decompose complex tasks and worker agents to execute subtasks, making it ideal for complex problem-solving scenarios. + +### BatchedGridWorkflow + +Use Case: Parallel processing of multiple tasks in a batched grid format. + +```python +batched_grid_router = SwarmRouter( + name="BatchedGridWorkflow", + description="Process multiple tasks in parallel batches", + max_loops=1, + agents=[agent1, agent2, agent3], + swarm_type="BatchedGridWorkflow" +) + +result = batched_grid_router.run(tasks=["Task 1", "Task 2", "Task 3"]) +``` + +BatchedGridWorkflow is designed for efficiently processing multiple tasks in parallel batches, optimizing resource utilization. + ## Advanced Features ### Processing Documents @@ -402,15 +479,7 @@ To process multiple tasks in a batch: ```python tasks = ["Analyze Q1 report", "Summarize competitor landscape", "Evaluate market trends"] -results = router.batch_run(tasks) -``` - -### Asynchronous Execution - -For asynchronous task execution: - -```python -result = await router.async_run("Generate financial projections") +results = router.batch_run(tasks, img="image.png") # Optional: img parameter for image tasks ``` ### Concurrent Execution @@ -418,16 +487,7 @@ result = await router.async_run("Generate financial projections") To run a single task concurrently: ```python -result = router.concurrent_run("Analyze multiple data streams") -``` - -### Concurrent Batch Processing - -To process multiple tasks concurrently: - -```python -tasks = ["Task 1", "Task 2", "Task 3"] -results = router.concurrent_batch_run(tasks) +result = router.concurrent_run("Analyze multiple data streams", img="image.png") # Optional: img parameter ``` ### Using the SwarmRouter as a Callable diff --git a/example.py b/example.py index 0a27c20c..42959ded 100644 --- a/example.py +++ b/example.py @@ -1,4 +1,3 @@ -import json from swarms import Agent @@ -12,7 +11,7 @@ agent = Agent( dynamic_context_window=True, streaming_on=False, top_p=None, - output_type="dict", + stream=True, ) out = agent.run( @@ -20,4 +19,5 @@ out = agent.run( n=1, ) -print(json.dumps(out, indent=4)) +for token in out: + print(token, end="", flush=True) diff --git a/examples/multi_agent/debate_examples/business_strategy_debate_example.py b/examples/multi_agent/debate_examples/business_strategy_debate_example.py index 66ee8b62..7dd44c11 100644 --- a/examples/multi_agent/debate_examples/business_strategy_debate_example.py +++ b/examples/multi_agent/debate_examples/business_strategy_debate_example.py @@ -84,4 +84,3 @@ print(history) # Get the final refined answer final_answer = strategy_debate.get_final_answer() print(final_answer) - diff --git a/examples/multi_agent/debate_examples/policy_debate_example.py b/examples/multi_agent/debate_examples/policy_debate_example.py index e8e744c5..a2e7c5ce 100644 --- a/examples/multi_agent/debate_examples/policy_debate_example.py +++ b/examples/multi_agent/debate_examples/policy_debate_example.py @@ -80,4 +80,3 @@ print(result) # Get the final refined answer final_answer = debate_system.get_final_answer() print(final_answer) - diff --git a/examples/multi_agent/debate_examples/technical_architecture_debate_example.py b/examples/multi_agent/debate_examples/technical_architecture_debate_example.py index e59bfa53..24ecf3d1 100644 --- a/examples/multi_agent/debate_examples/technical_architecture_debate_example.py +++ b/examples/multi_agent/debate_examples/technical_architecture_debate_example.py @@ -68,4 +68,3 @@ results = architecture_debate.batched_run(architecture_questions) # Display results for result in results: print(result) - diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 29222e1f..32687894 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -2584,91 +2584,129 @@ class Agent: task=task, *args, **kwargs ) - if hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str): + if hasattr( + streaming_response, "__iter__" + ) and not isinstance(streaming_response, str): complete_response = "" token_count = 0 final_chunk = None first_chunk = None - + for chunk in streaming_response: if first_chunk is None: first_chunk = chunk - - if hasattr(chunk, "choices") and chunk.choices[0].delta.content: + + if ( + hasattr(chunk, "choices") + and chunk.choices[0].delta.content + ): content = chunk.choices[0].delta.content complete_response += content token_count += 1 - + # Schema per token outputted token_info = { "token_index": token_count, - "model": getattr(chunk, 'model', self.get_current_model()), - "id": getattr(chunk, 'id', ''), - "created": getattr(chunk, 'created', int(time.time())), - "object": getattr(chunk, 'object', 'chat.completion.chunk'), + "model": getattr( + chunk, + "model", + self.get_current_model(), + ), + "id": getattr(chunk, "id", ""), + "created": getattr( + chunk, "created", int(time.time()) + ), + "object": getattr( + chunk, + "object", + "chat.completion.chunk", + ), "token": content, - "system_fingerprint": getattr(chunk, 'system_fingerprint', ''), - "finish_reason": chunk.choices[0].finish_reason, - "citations": getattr(chunk, 'citations', None), - "provider_specific_fields": getattr(chunk, 'provider_specific_fields', None), - "service_tier": getattr(chunk, 'service_tier', 'default'), - "obfuscation": getattr(chunk, 'obfuscation', None), - "usage": getattr(chunk, 'usage', None), + "system_fingerprint": getattr( + chunk, "system_fingerprint", "" + ), + "finish_reason": chunk.choices[ + 0 + ].finish_reason, + "citations": getattr( + chunk, "citations", None + ), + "provider_specific_fields": getattr( + chunk, + "provider_specific_fields", + None, + ), + "service_tier": getattr( + chunk, "service_tier", "default" + ), + "obfuscation": getattr( + chunk, "obfuscation", None + ), + "usage": getattr( + chunk, "usage", None + ), "logprobs": chunk.choices[0].logprobs, - "timestamp": time.time() + "timestamp": time.time(), } - + print(f"ResponseStream {token_info}") - + if streaming_callback is not None: streaming_callback(token_info) - + final_chunk = chunk - - #Final ModelResponse to stream - if final_chunk and hasattr(final_chunk, 'usage') and final_chunk.usage: + + # Final ModelResponse to stream + if ( + final_chunk + and hasattr(final_chunk, "usage") + and final_chunk.usage + ): usage = final_chunk.usage - print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', " - f"created={getattr(final_chunk, 'created', 'N/A')}, " - f"model='{getattr(final_chunk, 'model', self.get_current_model())}', " - f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', " - f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', " - f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', " - f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, " - f"function_call=None, tool_calls=None, audio=None), logprobs=None)], " - f"provider_specific_fields=None, " - f"usage=Usage(completion_tokens={usage.completion_tokens}, " - f"prompt_tokens={usage.prompt_tokens}, " - f"total_tokens={usage.total_tokens}, " - f"completion_tokens_details=CompletionTokensDetailsWrapper(" - f"accepted_prediction_tokens={usage.completion_tokens_details.accepted_prediction_tokens}, " - f"audio_tokens={usage.completion_tokens_details.audio_tokens}, " - f"reasoning_tokens={usage.completion_tokens_details.reasoning_tokens}, " - f"rejected_prediction_tokens={usage.completion_tokens_details.rejected_prediction_tokens}, " - f"text_tokens={usage.completion_tokens_details.text_tokens}), " - f"prompt_tokens_details=PromptTokensDetailsWrapper(" - f"audio_tokens={usage.prompt_tokens_details.audio_tokens}, " - f"cached_tokens={usage.prompt_tokens_details.cached_tokens}, " - f"text_tokens={usage.prompt_tokens_details.text_tokens}, " - f"image_tokens={usage.prompt_tokens_details.image_tokens})))") + print( + f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', " + f"created={getattr(final_chunk, 'created', 'N/A')}, " + f"model='{getattr(final_chunk, 'model', self.get_current_model())}', " + f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', " + f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', " + f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', " + f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, " + f"function_call=None, tool_calls=None, audio=None), logprobs=None)], " + f"provider_specific_fields=None, " + f"usage=Usage(completion_tokens={usage.completion_tokens}, " + f"prompt_tokens={usage.prompt_tokens}, " + f"total_tokens={usage.total_tokens}, " + f"completion_tokens_details=CompletionTokensDetailsWrapper(" + f"accepted_prediction_tokens={usage.completion_tokens_details.accepted_prediction_tokens}, " + f"audio_tokens={usage.completion_tokens_details.audio_tokens}, " + f"reasoning_tokens={usage.completion_tokens_details.reasoning_tokens}, " + f"rejected_prediction_tokens={usage.completion_tokens_details.rejected_prediction_tokens}, " + f"text_tokens={usage.completion_tokens_details.text_tokens}), " + f"prompt_tokens_details=PromptTokensDetailsWrapper(" + f"audio_tokens={usage.prompt_tokens_details.audio_tokens}, " + f"cached_tokens={usage.prompt_tokens_details.cached_tokens}, " + f"text_tokens={usage.prompt_tokens_details.text_tokens}, " + f"image_tokens={usage.prompt_tokens_details.image_tokens})))" + ) else: - print(f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', " - f"created={getattr(final_chunk, 'created', 'N/A')}, " - f"model='{getattr(final_chunk, 'model', self.get_current_model())}', " - f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', " - f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', " - f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', " - f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, " - f"function_call=None, tool_calls=None, audio=None), logprobs=None)], " - f"provider_specific_fields=None)") - - + print( + f"ModelResponseStream(id='{getattr(final_chunk, 'id', 'N/A')}', " + f"created={getattr(final_chunk, 'created', 'N/A')}, " + f"model='{getattr(final_chunk, 'model', self.get_current_model())}', " + f"object='{getattr(final_chunk, 'object', 'chat.completion.chunk')}', " + f"system_fingerprint='{getattr(final_chunk, 'system_fingerprint', 'N/A')}', " + f"choices=[StreamingChoices(finish_reason='{final_chunk.choices[0].finish_reason}', " + f"index=0, delta=Delta(provider_specific_fields=None, content=None, role=None, " + f"function_call=None, tool_calls=None, audio=None), logprobs=None)], " + f"provider_specific_fields=None)" + ) + self.llm.stream = original_stream return complete_response else: self.llm.stream = original_stream return streaming_response - + elif self.streaming_on and hasattr(self.llm, "stream"): original_stream = self.llm.stream self.llm.stream = True diff --git a/swarms/structs/debate_with_judge.py b/swarms/structs/debate_with_judge.py index c77372e0..e3104198 100644 --- a/swarms/structs/debate_with_judge.py +++ b/swarms/structs/debate_with_judge.py @@ -332,7 +332,7 @@ class DebateWithJudge: str: The content of the final judge synthesis. """ return self.conversation.get_final_message_content() - + def batched_run(self, tasks: List[str]) -> List[str]: """ Run the debate with judge refinement process for a batch of tasks. diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index b078d970..92903f57 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -2,7 +2,16 @@ import concurrent.futures import json import os import traceback -from typing import Any, Callable, Dict, List, Literal, Optional, Union, get_args +from typing import ( + Any, + Callable, + Dict, + List, + Literal, + Optional, + Union, + get_args, +) from pydantic import BaseModel, Field diff --git a/tests/structs/test_agent_stream_token.py b/tests/structs/test_agent_stream_token.py index 5cd02207..0e146d75 100644 --- a/tests/structs/test_agent_stream_token.py +++ b/tests/structs/test_agent_stream_token.py @@ -1,8 +1,8 @@ from swarms.structs.agent import Agent agent = Agent( - model_name="gpt-4.1", - max_loops=1, + model_name="gpt-4.1", + max_loops=1, stream=True, )