diff --git a/changelog.md b/changelog.md
deleted file mode 100644
index 4cd80326..00000000
--- a/changelog.md
+++ /dev/null
@@ -1 +0,0 @@
-# 5.8.7
\ No newline at end of file
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
index f8c20923..4873ad7f 100644
--- a/docs/mkdocs.yml
+++ b/docs/mkdocs.yml
@@ -192,6 +192,7 @@ nav:
- Changelog:
- Swarms 5.6.8: "swarms/changelog/5_6_8.md"
- Swarms 5.8.1: "swarms/changelog/5_8_1.md"
+ - Swarms 5.9.2: "swarms/changelog/changelog_new.md"
- Swarm Models:
- Overview: "swarms/models/index.md"
# - Models Available: "swarms/models/index.md"
diff --git a/docs/swarms/changelog/changelog_new.md b/docs/swarms/changelog/changelog_new.md
new file mode 100644
index 00000000..adc92b02
--- /dev/null
+++ b/docs/swarms/changelog/changelog_new.md
@@ -0,0 +1,90 @@
+# 🚀 Swarms 5.9.2 Release Notes
+
+
+### 🎯 Major Features
+
+#### Concurrent Agent Execution Suite
+We're excited to introduce a comprehensive suite of agent execution methods to supercharge your multi-agent workflows:
+
+- `run_agents_concurrently`: Execute multiple agents in parallel with optimal resource utilization
+- `run_agents_concurrently_async`: Asynchronous execution for improved performance
+- `run_single_agent`: Streamlined single agent execution
+- `run_agents_concurrently_multiprocess`: Multi-process execution for CPU-intensive tasks
+- `run_agents_sequentially`: Sequential execution with controlled flow
+- `run_agents_with_different_tasks`: Assign different tasks to different agents
+- `run_agent_with_timeout`: Time-bounded agent execution
+- `run_agents_with_resource_monitoring`: Monitor and manage resource usage
+
+### 📚 Documentation
+- Comprehensive documentation added for all new execution methods
+- Updated examples and usage patterns
+- Enhanced API reference
+
+### 🛠️ Improvements
+- Tree swarm implementation fixes
+- Workspace directory now automatically set to `agent_workspace`
+- Improved error handling and stability
+
+## Quick Start
+
+```python
+from swarms import Agent, run_agents_concurrently, run_agents_with_timeout, run_agents_with_different_tasks
+
+# Initialize multiple agents
+agents = [
+ Agent(
+ agent_name=f"Analysis-Agent-{i}",
+ system_prompt="You are a financial analysis expert",
+ llm=model,
+ max_loops=1
+ )
+ for i in range(5)
+]
+
+# Run agents concurrently
+task = "Analyze the impact of rising interest rates on tech stocks"
+outputs = run_agents_concurrently(agents, task)
+
+# Example with timeout
+outputs_with_timeout = run_agents_with_timeout(
+ agents=agents,
+ task=task,
+ timeout=30.0,
+ batch_size=2
+)
+
+# Run different tasks
+task_pairs = [
+ (agents[0], "Analyze tech stocks"),
+ (agents[1], "Analyze energy stocks"),
+ (agents[2], "Analyze retail stocks")
+]
+different_outputs = run_agents_with_different_tasks(task_pairs)
+```
+
+## Installation
+```bash
+pip3 install -U swarms
+```
+
+## Coming Soon
+- 🌟 Auto Swarm Builder: Automatically construct and configure entire swarms from a single task specification (in development)
+- Auto Prompt Generator for thousands of agents (in development)
+
+## Community
+We believe in the power of community-driven development. Help us make Swarms better!
+
+- ⭐ Star our repository: https://github.com/kyegomez/swarms
+- 🔄 Fork the project and contribute your improvements
+- 🤝 Join our growing community of contributors
+
+## Bug Fixes
+- Fixed Tree Swarm implementation issues
+- Resolved workspace directory configuration problems
+- General stability improvements
+
+---
+
+For detailed documentation and examples, visit our [GitHub repository](https://github.com/kyegomez/swarms).
+
+Let's build the future of multi-agent systems together! 🚀
\ No newline at end of file
diff --git a/docs/swarms/structs/agent.md b/docs/swarms/structs/agent.md
index 915ff873..97ab465b 100644
--- a/docs/swarms/structs/agent.md
+++ b/docs/swarms/structs/agent.md
@@ -156,7 +156,6 @@ graph TD
| `save_to_yaml(file_path)` | Saves the agent to a YAML file. | `file_path` (str): Path to save the YAML file. | `agent.save_to_yaml("agent_config.yaml")` |
| `get_llm_parameters()` | Returns the parameters of the language model. | None | `llm_params = agent.get_llm_parameters()` |
| `save_state(file_path, *args, **kwargs)` | Saves the current state of the agent to a JSON file. | `file_path` (str): Path to save the JSON file.
`*args`, `**kwargs`: Additional arguments. | `agent.save_state("agent_state.json")` |
-| `load_state(file_path)` | Loads the state of the agent from a JSON file. | `file_path` (str): Path to the JSON file. | `agent.load_state("agent_state.json")` |
| `update_system_prompt(system_prompt)` | Updates the system prompt. | `system_prompt` (str): New system prompt. | `agent.update_system_prompt("New system instructions")` |
| `update_max_loops(max_loops)` | Updates the maximum number of loops. | `max_loops` (int): New maximum number of loops. | `agent.update_max_loops(5)` |
| `update_loop_interval(loop_interval)` | Updates the loop interval. | `loop_interval` (int): New loop interval. | `agent.update_loop_interval(2)` |
@@ -184,11 +183,9 @@ graph TD
| `check_available_tokens()` | Checks and returns the number of available tokens. | None | `available_tokens = agent.check_available_tokens()` |
| `tokens_checks()` | Performs token checks and returns available tokens. | None | `token_info = agent.tokens_checks()` |
| `truncate_string_by_tokens(input_string, limit)` | Truncates a string to fit within a token limit. | `input_string` (str): String to truncate.
`limit` (int): Token limit. | `truncated_string = agent.truncate_string_by_tokens("Long string", 100)` |
-| `if_tokens_exceeds_context_length()` | Checks if the number of tokens exceeds the context length. | None | `exceeds = agent.if_tokens_exceeds_context_length()` |
| `tokens_operations(input_string)` | Performs various token-related operations on the input string. | `input_string` (str): String to process. | `processed_string = agent.tokens_operations("Input string")` |
| `parse_function_call_and_execute(response)` | Parses a function call from the response and executes it. | `response` (str): Response containing the function call. | `result = agent.parse_function_call_and_execute(response)` |
| `activate_agentops()` | Activates AgentOps functionality. | None | `agent.activate_agentops()` |
-| `count_tokens_and_subtract_from_context_window(response, *args, **kwargs)` | Counts tokens in the response and adjusts the context window. | `response` (str): Response to process.
`*args`, `**kwargs`: Additional arguments. | `await agent.count_tokens_and_subtract_from_context_window(response)` |
| `llm_output_parser(response)` | Parses the output from the language model. | `response` (Any): Response from the LLM. | `parsed_response = agent.llm_output_parser(llm_output)` |
| `log_step_metadata(loop, task, response)` | Logs metadata for each step of the agent's execution. | `loop` (int): Current loop number.
`task` (str): Current task.
`response` (str): Agent's response. | `agent.log_step_metadata(1, "Analyze data", "Analysis complete")` |
| `to_dict()` | Converts the agent's attributes to a dictionary. | None | `agent_dict = agent.to_dict()` |
@@ -391,7 +388,7 @@ agent.save_state('saved_flow.json')
# Load the agent state
agent = Agent(llm=llm_instance, max_loops=5)
-agent.load_state('saved_flow.json')
+agent.load('saved_flow.json')
agent.run("Continue with the task")
```
@@ -537,7 +534,7 @@ print(agent.system_prompt)
4. Leverage `long_term_memory` for tasks that require persistent information.
5. Use `interactive` mode for real-time conversations and `dashboard` for monitoring.
6. Implement `sentiment_analysis` for applications requiring tone management.
-7. Utilize `autosave` and `save_state`/`load_state` methods for continuity across sessions.
+7. Utilize `autosave` and `save`/`load` methods for continuity across sessions.
8. Optimize token usage with `dynamic_context_window` and `tokens_checks` methods.
9. Use `concurrent` and `async` methods for performance-critical applications.
10. Regularly review and analyze feedback using the `analyze_feedback` method.
diff --git a/docs/swarms/structs/agent_docs_v1.md b/docs/swarms/structs/agent_docs_v1.md
index 4bc5a5c9..8cba120d 100644
--- a/docs/swarms/structs/agent_docs_v1.md
+++ b/docs/swarms/structs/agent_docs_v1.md
@@ -455,7 +455,7 @@ agent.save_state('saved_flow.json')
# Load the agent state
agent = Agent(llm=llm_instance, max_loops=5)
-agent.load_state('saved_flow.json')
+agent.load('saved_flow.json')
agent.run("Continue with the task")
```
diff --git a/example.py b/example.py
index e53c08b0..9ae74915 100644
--- a/example.py
+++ b/example.py
@@ -2,9 +2,6 @@ import os
from swarms import Agent
from swarm_models import OpenAIChat
-from swarms.prompts.finance_agent_sys_prompt import (
- FINANCIAL_AGENT_SYS_PROMPT,
-)
from dotenv import load_dotenv
load_dotenv()
@@ -20,9 +17,9 @@ model = OpenAIChat(
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
- system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
+ # system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
llm=model,
- max_loops=1,
+ max_loops=3,
autosave=True,
dashboard=False,
verbose=True,
@@ -31,7 +28,7 @@ agent = Agent(
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
- return_step_meta=False,
+ return_step_meta=True,
# output_type="json",
output_type="json", # "json", "dict", "csv" OR "string" soon "yaml" and
streaming_on=False,
diff --git a/new_prompt.py b/new_prompt.py
deleted file mode 100644
index 9375c8f3..00000000
--- a/new_prompt.py
+++ /dev/null
@@ -1,30 +0,0 @@
-from swarms import Prompt
-from swarm_models import OpenAIChat
-import os
-
-model = OpenAIChat(
- api_key=os.getenv("OPENAI_API_KEY"),
- model_name="gpt-4o-mini",
- temperature=0.1,
-)
-
-# Aggregator system prompt
-prompt_generator_sys_prompt = Prompt(
- name="prompt-generator-sys-prompt-o1",
- description="Generate the most reliable prompt for a specific problem",
- content="""
- Your purpose is to craft extremely reliable and production-grade system prompts for other agents.
-
- # Instructions
- - Understand the prompt required for the agent.
- - Utilize a combination of the most effective prompting strategies available, including chain of thought, many shot, few shot, and instructions-examples-constraints.
- - Craft the prompt by blending the most suitable prompting strategies.
- - Ensure the prompt is production-grade ready and educates the agent on how to reason and why to reason in that manner.
- - Provide constraints if necessary and as needed.
- - The system prompt should be extensive and cover a vast array of potential scenarios to specialize the agent.
- """,
- auto_generate_prompt=True,
- llm=model,
-)
-
-# print(prompt_generator_sys_prompt.get_prompt())
diff --git a/pyproject.toml b/pyproject.toml
index e027bd9c..cf823aa8 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
-version = "5.9.2"
+version = "6.0.0"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez "]
diff --git a/swarms/schemas/agent_step_schemas.py b/swarms/schemas/agent_step_schemas.py
index f78ee485..a6186229 100644
--- a/swarms/schemas/agent_step_schemas.py
+++ b/swarms/schemas/agent_step_schemas.py
@@ -8,6 +8,7 @@ from pydantic import BaseModel, Field
from swarms.schemas.base_schemas import (
AgentChatCompletionResponse,
)
+from typing import Union
def get_current_time():
@@ -56,7 +57,7 @@ class ManySteps(BaseModel):
description="The ID of the task this step belongs to.",
examples=["50da533e-3904-4401-8a07-c49adf88b5eb"],
)
- steps: Optional[List[Step]] = Field(
+ steps: Optional[List[Union[Step, Any]]] = Field(
[],
description="The steps of the task.",
)
diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py
index b8915583..09b54c7a 100644
--- a/swarms/structs/__init__.py
+++ b/swarms/structs/__init__.py
@@ -76,7 +76,6 @@ from swarms.structs.multi_agent_exec import (
run_agents_with_different_tasks,
run_agent_with_timeout,
run_agents_with_resource_monitoring,
-
)
__all__ = [
diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py
index 4a467095..e70ac0a1 100644
--- a/swarms/structs/agent.py
+++ b/swarms/structs/agent.py
@@ -21,12 +21,16 @@ from typing import (
import toml
import yaml
+from clusterops import (
+ execute_on_gpu,
+ execute_with_cpu_cores,
+)
from loguru import logger
from pydantic import BaseModel
-from swarms_memory import BaseVectorDatabase
+from swarm_models.tiktoken_wrapper import TikTokenizer
from termcolor import colored
-from swarm_models.tiktoken_wrapper import TikTokenizer
+from swarms.agents.ape_agent import auto_generate_prompt
from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3
from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1,
@@ -40,23 +44,14 @@ from swarms.schemas.base_schemas import (
)
from swarms.structs.concat import concat_strings
from swarms.structs.conversation import Conversation
-from swarms.structs.yaml_model import YamlModel
from swarms.tools.base_tool import BaseTool
from swarms.tools.func_calling_utils import (
prepare_output_for_output_model,
)
-from swarms.tools.prebuilt.code_executor import CodeExecutor
from swarms.tools.tool_parse_exec import parse_and_execute_json
from swarms.utils.data_to_text import data_to_text
from swarms.utils.file_processing import create_file_in_folder
-from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.pdf_to_text import pdf_to_text
-from swarms.utils.run_on_cpu import run_on_cpu
-from clusterops import (
- execute_on_gpu,
- execute_with_cpu_cores,
-)
-from swarms.agents.ape_agent import auto_generate_prompt
# Utils
@@ -85,7 +80,7 @@ def exists(val):
# Agent output types
# agent_output_type = Union[BaseModel, dict, str]
agent_output_type = Literal[
- "string", "str", "list", "json", "dict", "yaml"
+ "string", "str", "list", "json", "dict", "yaml", "json_schema"
]
ToolUsageType = Union[BaseModel, Dict[str, Any]]
@@ -198,11 +193,8 @@ class Agent:
interactive_run: Run the agent in interactive mode
streamed_generation: Stream the generation of the response
save_state: Save the state
- load_state: Load the state
truncate_history: Truncate the history
add_task_to_memory: Add the task to the memory
- add_message_to_memory: Add the message to the memory
- add_message_to_memory_and_truncate: Add the message to the memory and truncate
print_dashboard: Print the dashboard
loop_count_print: Print the loop count
streaming: Stream the content
@@ -260,7 +252,7 @@ class Agent:
pdf_path: Optional[str] = None,
list_of_pdf: Optional[str] = None,
tokenizer: Optional[Any] = None,
- long_term_memory: Optional[BaseVectorDatabase] = None,
+ long_term_memory: Optional[Any] = None,
preset_stopping_token: Optional[bool] = False,
traceback: Optional[Any] = None,
traceback_handlers: Optional[Any] = None,
@@ -330,6 +322,8 @@ class Agent:
data_memory: Optional[Callable] = None,
load_yaml_path: str = None,
auto_generate_prompt: bool = False,
+ rag_every_loop: bool = False,
+ plan_enabled: bool = False,
*args,
**kwargs,
):
@@ -416,7 +410,6 @@ class Agent:
self.log_directory = log_directory
self.tool_system_prompt = tool_system_prompt
self.max_tokens = max_tokens
-
self.frequency_penalty = frequency_penalty
self.presence_penalty = presence_penalty
self.temperature = temperature
@@ -436,10 +429,21 @@ class Agent:
self.load_yaml_path = load_yaml_path
self.tokenizer = TikTokenizer()
self.auto_generate_prompt = auto_generate_prompt
+ self.rag_every_loop = rag_every_loop
+ self.plan_enabled = plan_enabled
+
+ # Initialize the short term memory
+ self.short_memory = Conversation(
+ system_prompt=system_prompt,
+ time_enabled=True,
+ user=user_name,
+ rules=rules,
+ *args,
+ **kwargs,
+ )
# Initialize the feedback
self.feedback = []
- self.step_pool = []
# Initialize the executor
self.executor = ThreadPoolExecutor(
@@ -472,17 +476,6 @@ class Agent:
if preset_stopping_token is not None:
self.stopping_token = ""
- # If the system prompt is provided then set the system prompt
- # Initialize the short term memory
- self.short_memory = Conversation(
- system_prompt=system_prompt,
- time_enabled=True,
- user=user_name,
- rules=rules,
- *args,
- **kwargs,
- )
-
# # Check the parameters
# # Telemetry Processor to log agent data
# threading.Thread(target=self.agent_initialization()).start
@@ -561,10 +554,10 @@ class Agent:
# run_id=run_id,
task="",
max_loops=self.max_loops,
- steps=self.step_pool,
- full_history=self.short_memory.return_history_as_string(),
+ steps=self.short_memory.to_dict(),
+ full_history=self.short_memory.get_str(),
total_tokens=self.tokenizer.count_tokens(
- self.short_memory.return_history_as_string()
+ self.short_memory.get_str()
),
stopping_token=self.stopping_token,
interactive=self.interactive,
@@ -576,39 +569,48 @@ class Agent:
def check_if_no_prompt_then_autogenerate(self, task: str = None):
"""
- Checks if a system prompt is not set and auto_generate_prompt is enabled. If so, it auto-generates a prompt based on the agent's name, description, or the task if both are missing.
+ Checks if auto_generate_prompt is enabled and generates a prompt by combining agent name, description and system prompt if available.
+ Falls back to task if all other fields are missing.
Args:
- task (str, optional): The task to use as a fallback if both name and description are missing. Defaults to None.
+ task (str, optional): The task to use as a fallback if name, description and system prompt are missing. Defaults to None.
"""
- if (
- self.system_prompt is None
- and self.auto_generate_prompt is True
- ):
- if self.description:
- prompt = auto_generate_prompt(
- self.description, self.llm
- )
- elif self.agent_name:
- logger.info(
- "Description is missing. Using agent name as a fallback."
+ if self.auto_generate_prompt is True:
+ # Collect all available prompt components
+ components = []
+
+ if self.agent_name:
+ components.append(self.agent_name)
+
+ if self.agent_description:
+ components.append(self.agent_description)
+
+ if self.system_prompt:
+ components.append(self.system_prompt)
+
+ # If no components available, fall back to task
+ if not components and task:
+ logger.warning(
+ "No agent details found. Using task as fallback for prompt generation."
)
- prompt = auto_generate_prompt(
- self.agent_name, self.llm
+ self.system_prompt = auto_generate_prompt(
+ task, self.llm
)
else:
- logger.warning(
- "Both description and agent name are missing. Using task as a fallback."
+ # Combine all available components
+ combined_prompt = " ".join(components)
+ logger.info(
+ f"Auto-generating prompt from: {', '.join(components)}"
)
- prompt = auto_generate_prompt(task, self.llm)
- self.system_prompt = prompt
- logger.info("Auto-generated prompt successfully.")
- else:
- if self.system_prompt is not None:
self.system_prompt = auto_generate_prompt(
- self.system_prompt, self.llm
+ combined_prompt, self.llm
+ )
+ self.short_memory.add(
+ role="system", content=self.system_prompt
)
+ logger.info("Auto-generated prompt successfully.")
+
def set_system_prompt(self, system_prompt: str):
"""Set the system prompt"""
self.system_prompt = system_prompt
@@ -674,29 +676,6 @@ class Agent:
)
)
- def format_prompt(self, template, **kwargs: Any) -> str:
- """Format the template with the provided kwargs using f-string interpolation."""
- return template.format(**kwargs)
-
- def add_message_to_memory(self, message: str, *args, **kwargs):
- """Add the message to the memory"""
- try:
- logger.info(f"Adding message to memory: {message}")
- self.short_memory.add(
- role=self.agent_name, content=message, *args, **kwargs
- )
- except Exception as error:
- print(
- colored(
- f"Error adding message to memory: {error}", "red"
- )
- )
-
- # def add_message_to_memory_and_truncate(self, message: str):
- # """Add the message to the memory and truncate"""
- # self.short_memory[-1].append(message)
- # self.truncate_history()
-
def print_dashboard(self):
"""Print dashboard"""
print(colored("Initializing Agent Dashboard...", "yellow"))
@@ -725,7 +704,9 @@ class Agent:
)
)
- def loop_count_print(self, loop_count, max_loops):
+ def loop_count_print(
+ self, loop_count: int, max_loops: int
+ ) -> None:
"""loop_count_print summary
Args:
@@ -735,6 +716,7 @@ class Agent:
print(colored(f"\nLoop {loop_count} of {max_loops}", "cyan"))
print("\n")
+ # Check parameters
def check_parameters(self):
if self.llm is None:
raise ValueError("Language model is not provided")
@@ -748,8 +730,7 @@ class Agent:
if self.context_length == 0:
raise ValueError("Context length is not provided")
- ########################## FUNCTION CALLING ##########################
- @run_on_cpu
+ # Main function
def _run(
self,
task: Optional[str] = None,
@@ -759,7 +740,21 @@ class Agent:
**kwargs,
) -> Any:
"""
- Run the autonomous agent loop
+ run the agent
+
+ Args:
+ task (str): The task to be performed.
+ img (str): The image to be processed.
+ is_last (bool): Indicates if this is the last task.
+
+ Returns:
+ Any: The output of the agent.
+ (string, list, json, dict, yaml)
+
+ Examples:
+ agent(task="What is the capital of France?")
+ agent(task="What is the capital of France?", img="path/to/image.jpg")
+ agent(task="What is the capital of France?", img="path/to/image.jpg", is_last=True)
"""
try:
self.check_if_no_prompt_then_autogenerate(task)
@@ -769,20 +764,25 @@ class Agent:
# Add task to memory
self.short_memory.add(role=self.user_name, content=task)
+ # Plan
+ if self.plan_enabled is True:
+ self.plan(task)
+
# Set the loop count
loop_count = 0
-
# Clear the short memory
response = None
all_responses = []
+ # Query the long term memory first for the context
+ if self.long_term_memory is not None:
+ self.memory_query(task)
+
while (
self.max_loops == "auto"
or loop_count < self.max_loops
):
loop_count += 1
- # Log step start
- current_step_id = f"step_{loop_count}_{uuid.uuid4().hex}"
self.loop_count_print(loop_count, self.max_loops)
print("\n")
@@ -800,9 +800,12 @@ class Agent:
success = False
while attempt < self.retry_attempts and not success:
try:
- if self.long_term_memory is not None:
+ if (
+ self.long_term_memory is not None
+ and self.rag_every_loop is True
+ ):
logger.info(
- "Querying long term memory..."
+ "Querying RAG database for context..."
)
self.memory_query(task_prompt)
@@ -816,8 +819,15 @@ class Agent:
*response_args, **kwargs
)
- # Log step metadata
- step_meta = self.log_step_metadata(loop_count, task_prompt, response)
+ # Convert to a str if the response is not a str
+ response = self.llm_output_parser(response)
+
+ # Print
+ if self.streaming_on is True:
+ self.stream_response(response)
+ else:
+ logger.info(f"Response: {response}")
+
# Check if response is a dictionary and has 'choices' key
if (
isinstance(response, dict)
@@ -836,33 +846,14 @@ class Agent:
# Check and execute tools
if self.tools is not None:
- tool_result = self.parse_and_execute_tools(response)
- if tool_result:
- self.update_tool_usage(
- step_meta["step_id"],
- tool_result["tool"],
- tool_result["args"],
- tool_result["response"]
- )
-
-
- # Update agent output history
- self.agent_output.full_history = self.short_memory.return_history_as_string()
-
- # Log the step metadata
- logged = self.log_step_metadata(
- loop_count, task_prompt, response
- )
- logger.info(logged)
-
- # Convert to a str if the response is not a str
- response = self.llm_output_parser(response)
-
- # Print
- if self.streaming_on is True:
- self.stream_response(response)
- else:
- print(response)
+ self.parse_and_execute_tools(response)
+ # if tool_result:
+ # self.update_tool_usage(
+ # step_meta["step_id"],
+ # tool_result["tool"],
+ # tool_result["args"],
+ # tool_result["response"],
+ # )
# Add the response to the memory
self.short_memory.add(
@@ -872,32 +863,7 @@ class Agent:
# Add to all responses
all_responses.append(response)
- # TODO: Implement reliability check
- if self.tools is not None:
- # self.parse_function_call_and_execute(response)
- self.parse_and_execute_tools(response)
- # if self.code_interpreter is True:
- # # Parse the code and execute
- # logger.info("Parsing code and executing...")
- # code = extract_code_from_markdown(response)
-
- # output = self.code_executor.execute(code)
-
- # # Add to memory
- # self.short_memory.add(
- # role=self.agent_name, content=output
- # )
-
- # # Run the llm on the output
- # response = self.llm(
- # self.short_memory.return_history_as_string()
- # )
-
- # # Add to all responses
- # all_responses.append(response)
- # self.short_memory.add(
- # role=self.agent_name, content=response
- # )
+ # # TODO: Implement reliability check
if self.evaluator:
logger.info("Evaluating response...")
@@ -909,19 +875,15 @@ class Agent:
f" {evaluated_response}"
)
self.short_memory.add(
- role=self.agent_name,
+ role="Evaluator",
content=evaluated_response,
)
- # all_responses.append(evaluated_response)
-
# Sentiment analysis
if self.sentiment_analyzer:
logger.info("Analyzing sentiment...")
self.sentiment_analysis_handler(response)
- # print(response)
-
success = True # Mark as successful to exit the retry loop
except Exception as e:
@@ -938,9 +900,7 @@ class Agent:
)
break # Exit the loop if all retry attempts fail
- # # Check stopping conditions
- # if self.stopping_token in response:
- # break
+ # Check stopping conditions
if (
self.stopping_condition is not None
and self._check_stopping_condition(response)
@@ -978,7 +938,7 @@ class Agent:
if self.autosave is True:
logger.info("Autosaving agent state.")
- self.save_state(self.saved_state_path)
+ self.save_state()
# Apply the cleaner function to the response
if self.output_cleaner is not None:
@@ -987,8 +947,11 @@ class Agent:
logger.info(
f"Response after output cleaner: {response}"
)
+ self.short_memory.add(
+ role="Output Cleaner",
+ content=response,
+ )
- # print(response)
if self.agent_ops_on is True and is_last is True:
self.check_end_session_agentops()
@@ -999,9 +962,21 @@ class Agent:
if response is not None
]
- # return self.agent_output_type(all_responses)
+ self.agent_output.steps = self.short_memory.to_dict()
+ self.agent_output.full_history = (
+ self.short_memory.get_str()
+ )
+ self.agent_output.total_tokens = (
+ self.tokenizer.count_tokens(
+ self.short_memory.get_str()
+ )
+ )
+
# More flexible output types
- if self.output_type == "string":
+ if (
+ self.output_type == "string"
+ or self.output_type == "str"
+ ):
return concat_strings(all_responses)
elif self.output_type == "list":
return all_responses
@@ -1017,6 +992,10 @@ class Agent:
return yaml.safe_dump(
self.agent_output.model_dump(), sort_keys=False
)
+ elif self.return_step_meta is True:
+ return self.agent_output.model_dump_json(indent=4)
+ elif self.return_history is True:
+ return self.short_memory.get_str()
else:
raise ValueError(
f"Invalid output type: {self.output_type}"
@@ -1103,9 +1082,6 @@ class Agent:
return output.getvalue()
def parse_and_execute_tools(self, response: str, *args, **kwargs):
- # Extract json from markdown
- # response = extract_code_from_markdown(response)
-
# Try executing the tool
if self.execute_tool is not False:
try:
@@ -1120,59 +1096,19 @@ class Agent:
**kwargs,
)
- print(f"Tool Output: {out}")
+ out = str(out)
+
+ logger.info(f"Tool Output: {out}")
# Add the output to the memory
self.short_memory.add(
- role=self.agent_name,
+ role="Tool Executor",
content=out,
)
except Exception as error:
logger.error(f"Error executing tool: {error}")
- print(
- colored(
- f"Error executing tool: {error}",
- "red",
- )
- )
-
- # def long_term_memory_prompt(self, query: str, *args, **kwargs):
- # """
- # Generate the agent long term memory prompt
-
- # Args:
- # system_prompt (str): The system prompt
- # history (List[str]): The history of the conversation
-
- # Returns:
- # str: The agent history prompt
- # """
- # try:
- # logger.info(f"Querying long term memory database for {query}")
- # ltr = self.long_term_memory.query(query, *args, **kwargs)
-
- # # Count the tokens
- # logger.info("Couting tokens of retrieved document")
- # ltr_count = self.tokenizer.count_tokens(ltr)
- # logger.info(f"Retrieved document token count {ltr_count}")
-
- # if ltr_count > self.memory_chunk_size:
- # logger.info(
- # f"Truncating memory by {self.memory_chunk_size}"
- # )
- # out = self.truncate_string_by_tokens(
- # ltr, self.memory_chunk_size
- # )
- # logger.info(
- # f"Memory truncated by {self.memory_chunk_size}"
- # )
-
- # # Retrieve only the chunk size of the memory
- # return out
- # except Exception as error:
- # logger.error(f"Error querying long term memory: {error}")
- # raise error
+ raise error
def add_memory(self, message: str):
"""Add a memory to the agent
@@ -1188,7 +1124,7 @@ class Agent:
role=self.agent_name, content=message
)
- def plan(self, task: str, *args, **kwargs):
+ def plan(self, task: str, *args, **kwargs) -> None:
"""
Plan the task
@@ -1199,10 +1135,13 @@ class Agent:
if exists(self.planning_prompt):
# Join the plan and the task
planning_prompt = f"{self.planning_prompt} {task}"
- plan = self.llm(planning_prompt)
+ plan = self.llm(planning_prompt, *args, **kwargs)
+ logger.info(f"Plan: {plan}")
# Add the plan to the memory
- self.short_memory.add(role=self.agent_name, content=plan)
+ self.short_memory.add(
+ role=self.agent_name, content=str(plan)
+ )
return None
except Exception as error:
@@ -1273,37 +1212,93 @@ class Agent:
Args:
file_path (_type_): _description_
"""
+ file_path = (
+ f"{self.saved_state_path}.json"
+ or f"{self.agent_name}.json"
+ or f"{self.saved_state_path}.json"
+ )
try:
create_file_in_folder(
self.workspace_dir,
- f"{self.saved_state_path}",
- self.to_dict(),
+ file_path,
+ self.to_json(),
)
- return "Saved agent history"
+ logger.info(f"Saved agent history to: {file_path}")
except Exception as error:
- print(
- colored(f"Error saving agent history: {error}", "red")
- )
+ logger.error(f"Error saving agent history: {error}")
+ raise error
- def load(self, file_path: str):
+ def load(self, file_path: str) -> None:
"""
- Load the agent history from a file.
+ Load the agent history from a file, excluding the LLM.
Args:
file_path (str): The path to the file containing the saved agent history.
+
+ Raises:
+ FileNotFoundError: If the specified file path does not exist
+ json.JSONDecodeError: If the file contains invalid JSON
+ AttributeError: If there are issues setting agent attributes
+ Exception: For other unexpected errors
"""
- with open(file_path, "r") as file:
- data = json.load(file)
+ try:
+ file_path = (
+ f"{self.saved_state_path}.json"
+ or f"{self.agent_name}.json"
+ or f"{self.saved_state_path}.json"
+ )
- for key, value in data.items():
- setattr(self, key, value)
+ if not os.path.exists(file_path):
+ raise FileNotFoundError(
+ f"File not found at path: {file_path}"
+ )
- return "Loaded agent history"
+ with open(file_path, "r") as file:
+ try:
+ data = json.load(file)
+ except json.JSONDecodeError as e:
+ logger.error(
+ f"Invalid JSON in file {file_path}: {str(e)}"
+ )
+ raise
+
+ if not isinstance(data, dict):
+ raise ValueError(
+ f"Expected dict data but got {type(data)}"
+ )
+
+ # Store current LLM
+ current_llm = self.llm
+
+ try:
+ for key, value in data.items():
+ if key != "llm":
+ setattr(self, key, value)
+ except AttributeError as e:
+ logger.error(
+ f"Error setting agent attribute: {str(e)}"
+ )
+ raise
+
+ # Restore LLM
+ self.llm = current_llm
+
+ logger.info(
+ f"Successfully loaded agent history from: {file_path}"
+ )
+
+ except Exception as e:
+ logger.error(
+ f"Unexpected error loading agent history: {str(e)}"
+ )
+ raise
+
+ return None
def graceful_shutdown(self):
"""Gracefully shutdown the system saving the state"""
logger.info("Shutting down the system...")
- return self.save_state(f"{self.agent_name}.json")
+ return self.save()
def analyze_feedback(self):
"""Analyze the feedback for issues"""
@@ -1351,32 +1346,6 @@ class Agent:
logger.info(f"Adding response filter: {filter_word}")
self.reponse_filters.append(filter_word)
- def code_interpreter_execution(
- self, code: str, *args, **kwargs
- ) -> str:
- # Extract code from markdown
- extracted_code = extract_code_from_markdown(code)
-
- # Execute the code
- execution = CodeExecutor().execute(extracted_code)
-
- # Add the execution to the memory
- self.short_memory.add(
- role=self.agent_name,
- content=execution,
- )
-
- # Run the llm again
- response = self.llm(
- self.short_memory.return_history_as_string(),
- *args,
- **kwargs,
- )
-
- print(f"Response after code interpretation: {response}")
-
- return response
-
def apply_reponse_filters(self, response: str) -> str:
"""
Apply the response filters to the response
@@ -1412,14 +1381,15 @@ class Agent:
with open(file_path, "w") as f:
yaml.dump(self.to_dict(), f)
except Exception as error:
- print(
+ logger.error(
colored(f"Error saving agent to YAML: {error}", "red")
)
+ raise error
def get_llm_parameters(self):
return str(vars(self.llm))
- def save_state(self, file_path: str, *args, **kwargs) -> None:
+ def save_state(self, *args, **kwargs) -> None:
"""
Saves the current state of the agent to a JSON file, including the llm parameters.
@@ -1430,45 +1400,11 @@ class Agent:
>>> agent.save_state('saved_flow.json')
"""
try:
- logger.info(
- f"Saving Agent {self.agent_name} state to: {file_path}"
- )
-
- json_data = self.to_json()
-
- create_file_in_folder(
- self.workspace_dir,
- file_path,
- str(json_data),
- )
-
- # Log the saved state
- logger.info(f"Saved agent state to: {file_path}")
- except Exception as error:
- logger.info(f"Error saving agent state: {error}")
- raise error
-
- def load_state(self, file_path: str):
- """
- Loads the state of the agent from a json file and restores the configuration and memory.
-
-
- Example:
- >>> agent = Agent(llm=llm_instance, max_loops=5)
- >>> agent.load_state('saved_flow.json')
- >>> agent.run("Continue with the task")
-
- """
- try:
- with open(file_path, "r") as file:
- data = json.load(file)
-
- for key, value in data.items():
- setattr(self, key, value)
-
- logger.info(f"Agent state loaded from {file_path}")
+ logger.info(f"Saving Agent {self.agent_name}")
+ self.save()
+ logger.info("Saved agent state")
except Exception as error:
- logger.info(f"Error loading agent state: {error}")
+ logger.error(f"Error saving agent state: {error}")
raise error
def update_system_prompt(self, system_prompt: str):
@@ -1551,15 +1487,48 @@ class Agent:
raise error
def add_tool(self, tool: Callable):
+ """Add a single tool to the agent's tools list.
+
+ Args:
+ tool (Callable): The tool function to add
+
+ Returns:
+ The result of appending the tool to the tools list
+ """
+ logger.info(f"Adding tool: {tool.__name__}")
return self.tools.append(tool)
def add_tools(self, tools: List[Callable]):
+ """Add multiple tools to the agent's tools list.
+
+ Args:
+ tools (List[Callable]): List of tool functions to add
+
+ Returns:
+ The result of extending the tools list
+ """
+ logger.info(f"Adding tools: {[t.__name__ for t in tools]}")
return self.tools.extend(tools)
def remove_tool(self, tool: Callable):
+ """Remove a single tool from the agent's tools list.
+
+ Args:
+ tool (Callable): The tool function to remove
+
+ Returns:
+ The result of removing the tool from the tools list
+ """
+ logger.info(f"Removing tool: {tool.__name__}")
return self.tools.remove(tool)
def remove_tools(self, tools: List[Callable]):
+ """Remove multiple tools from the agent's tools list.
+
+ Args:
+ tools (List[Callable]): List of tool functions to remove
+ """
+ logger.info(f"Removing tools: {[t.__name__ for t in tools]}")
for tool in tools:
self.tools.remove(tool)
@@ -1599,7 +1568,7 @@ class Agent:
"Could not import agentops, try installing agentops: $ pip3 install agentops"
)
- def memory_query(self, task: str = None, *args, **kwargs) -> str:
+ def memory_query(self, task: str = None, *args, **kwargs) -> None:
try:
# Query the long term memory
if self.long_term_memory is not None:
@@ -1608,32 +1577,29 @@ class Agent:
task, *args, **kwargs
)
+ memory_retrieval = (
+ f"Documents Available: {str(memory_retrieval)}"
+ )
+
+ # Count the tokens
memory_token_count = self.tokenizer.count_tokens(
memory_retrieval
)
-
if memory_token_count > self.memory_chunk_size:
# Truncate the memory by the memory chunk size
memory_retrieval = self.truncate_string_by_tokens(
memory_retrieval, self.memory_chunk_size
)
- # Merge the task prompt with the memory retrieval
- task_prompt = (
- f"{task} Documents Available: {memory_retrieval}"
- )
-
- response = self.llm(task_prompt, *args, **kwargs)
- print(response)
-
self.short_memory.add(
- role=self.agent_name, content=response
+ role="Database",
+ content=memory_retrieval,
)
- return response
+ return None
except Exception as e:
- print(f"An error occurred: {e}")
- return None
+ logger.error(f"An error occurred: {e}")
+ raise e
def sentiment_analysis_handler(self, response: str = None):
"""
@@ -1829,26 +1795,6 @@ class Agent:
else:
return input_string
- def if_tokens_exceeds_context_length(self):
- # Check if tokens exceeds the context length
- try:
- tokens_used = self.tokenizer.count_tokens(
- self.short_memory.return_history_as_string()
- )
- if tokens_used > self.context_length:
- logger.warning(
- "Tokens used exceeds the context length."
- )
- logger.info(
- f"Tokens available: {tokens_used - self.context_length}"
- )
- return True
- else:
- return False
- except Exception as e:
- logger.error(f"Error checking tokens: {e}")
- return None
-
def tokens_operations(self, input_string: str) -> str:
"""
Perform various operations on tokens of an input string.
@@ -1936,27 +1882,7 @@ class Agent:
"Could not import agentops, try installing agentops: $ pip3 install agentops"
)
- async def count_tokens_and_subtract_from_context_window(
- self, response: str, *args, **kwargs
- ):
- """
- Count the number of tokens in the response and subtract it from the context window.
-
- Args:
- response (str): The response to count the tokens from.
-
- Returns:
- str: The response after counting the tokens and subtracting it from the context window.
- """
- # Count the number of tokens in the response
- tokens = self.tokenizer.count_tokens(response)
-
- # Subtract the number of tokens from the context window
- self.context_length -= len(tokens)
-
- return response
-
- def llm_output_parser(self, response):
+ def llm_output_parser(self, response: Any) -> str:
"""Parse the output from the LLM"""
try:
if isinstance(response, dict):
@@ -1986,45 +1912,71 @@ class Agent:
"""Log metadata for each step of agent execution."""
# Generate unique step ID
step_id = f"step_{loop}_{uuid.uuid4().hex}"
-
+
# Calculate token usage
# full_memory = self.short_memory.return_history_as_string()
# prompt_tokens = self.tokenizer.count_tokens(full_memory)
# completion_tokens = self.tokenizer.count_tokens(response)
# total_tokens = prompt_tokens + completion_tokens
- total_tokens=self.tokenizer.count_tokens(task) + self.tokenizer.count_tokens(response),
+ total_tokens = (
+ self.tokenizer.count_tokens(task)
+ + self.tokenizer.count_tokens(response),
+ )
- # Get memory responses
- memory_responses = {
- "short_term": self.short_memory.return_history_as_string() if self.short_memory else None,
- "long_term": self.long_term_memory.query(task) if self.long_term_memory else None
- }
+ # # Get memory responses
+ # memory_responses = {
+ # "short_term": (
+ # self.short_memory.return_history_as_string()
+ # if self.short_memory
+ # else None
+ # ),
+ # "long_term": (
+ # self.long_term_memory.query(task)
+ # if self.long_term_memory
+ # else None
+ # ),
+ # }
+
+ # # Get tool responses if tool was used
+ # if self.tools:
+ # try:
+ # tool_call_output = parse_and_execute_json(
+ # self.tools, response, parse_md=True
+ # )
+ # if tool_call_output:
+ # {
+ # "tool_name": tool_call_output.get(
+ # "tool_name", "unknown"
+ # ),
+ # "tool_args": tool_call_output.get("args", {}),
+ # "tool_output": str(
+ # tool_call_output.get("output", "")
+ # ),
+ # }
+ # except Exception as e:
+ # logger.debug(
+ # f"No tool call detected in response: {e}"
+ # )
- # Get tool responses if tool was used
- tool_response = None
- if self.tools:
- try:
- tool_call_output = parse_and_execute_json(self.tools, response, parse_md=True)
- if tool_call_output:
- tool_response = {
- "tool_name": tool_call_output.get("tool_name", "unknown"),
- "tool_args": tool_call_output.get("args", {}),
- "tool_output": str(tool_call_output.get("output", ""))
- }
- except Exception as e:
- logger.debug(f"No tool call detected in response: {e}")
-
# Create memory usage tracking
- memory_usage = {
- "short_term": len(self.short_memory.messages) if self.short_memory else 0,
- "long_term": self.long_term_memory.count if self.long_term_memory else 0,
- "responses": memory_responses
- }
-
+ # memory_usage = {
+ # "short_term": (
+ # len(self.short_memory.messages)
+ # if self.short_memory
+ # else 0
+ # ),
+ # "long_term": (
+ # self.long_term_memory.count
+ # if self.long_term_memory
+ # else 0
+ # ),
+ # "responses": memory_responses,
+ # }
+
step_log = Step(
step_id=step_id,
time=time.time(),
- tokens = total_tokens,
+ tokens=total_tokens,
response=AgentChatCompletionResponse(
id=self.agent_id,
agent_name=self.agent_name,
@@ -2042,28 +1994,39 @@ class Agent:
# completion_tokens=completion_tokens,
# total_tokens=total_tokens,
# ),
- tool_calls=[] if tool_response is None else [tool_response],
- memory_usage=memory_usage
+ # tool_calls=(
+ # [] if tool_response is None else [tool_response]
+ # ),
+ # memory_usage=None,
),
)
-
+
# Update total tokens if agent_output exists
- if hasattr(self, 'agent_output'):
- self.agent_output.total_tokens += self.response.total_tokens
-
-
+ # if hasattr(self, "agent_output"):
+ # self.agent_output.total_tokens += (
+ # self.response.total_tokens
+ # )
+
# Add step to agent output tracking
self.step_pool.append(step_log)
- def update_tool_usage(self, step_id: str, tool_name: str, tool_args: dict, tool_response: Any):
+ def update_tool_usage(
+ self,
+ step_id: str,
+ tool_name: str,
+ tool_args: dict,
+ tool_response: Any,
+ ):
"""Update tool usage information for a specific step."""
for step in self.agent_output.steps:
if step.step_id == step_id:
- step.response.tool_calls.append({
- "tool": tool_name,
- "arguments": tool_args,
- "response": str(tool_response)
- })
+ step.response.tool_calls.append(
+ {
+ "tool": tool_name,
+ "arguments": tool_args,
+ "response": str(tool_response),
+ }
+ )
break
def _serialize_callable(
@@ -2158,7 +2121,7 @@ class Agent:
create_file_in_folder(
self.workspace_dir,
f"{self.agent_name}.yaml",
- self.to_yaml(),
+ str(self.to_yaml()),
)
return f"Model saved to {self.workspace_dir}/{self.agent_name}.yaml"
@@ -2248,25 +2211,7 @@ class Agent:
role=self.user_name, content=self.sop
)
- def agent_output_type(self, responses: list):
- if self.output_type == "list":
- return responses
-
- elif self.output_type == "str" or "string":
- return concat_strings(responses)
-
- elif self.return_step_meta is True:
- return self.agent_output.model_dump_json(indent=4)
-
- elif self.output_type == "yaml":
- model = YamlModel()
- return model.dict_to_yaml(self.agent_output.model_dump())
-
- elif self.output_type == "dict":
- return self.agent_output.model_dump()
-
- elif self.return_history:
- return self.short_memory.return_history_as_string()
+ logger.info("SOP Uploaded into the memory")
def run(
self,
diff --git a/swarms/structs/conversation.py b/swarms/structs/conversation.py
index 8e516e9c..768c19c5 100644
--- a/swarms/structs/conversation.py
+++ b/swarms/structs/conversation.py
@@ -1,11 +1,11 @@
import datetime
import json
-from typing import Optional
+from typing import Any, Optional
+import yaml
from termcolor import colored
from swarms.structs.base_structure import BaseStructure
-from typing import Any
class Conversation(BaseStructure):
@@ -96,10 +96,10 @@ class Conversation(BaseStructure):
self.add("System: ", self.system_prompt)
if self.rules is not None:
- self.add(user, rules)
+ self.add("User", rules)
if custom_rules_prompt is not None:
- self.add(user, custom_rules_prompt)
+ self.add(user or "User", custom_rules_prompt)
# If tokenizer then truncate
if tokenizer is not None:
@@ -245,6 +245,9 @@ class Conversation(BaseStructure):
]
)
+ def get_str(self):
+ return self.return_history_as_string()
+
def save_as_json(self, filename: str = None):
"""Save the conversation history as a JSON file
@@ -379,3 +382,21 @@ class Conversation(BaseStructure):
def clear(self):
self.conversation_history = []
+
+ def to_json(self):
+ return json.dumps(self.conversation_history)
+
+ def to_dict(self):
+ return self.conversation_history
+
+ def to_yaml(self):
+ return yaml.dump(self.conversation_history)
+
+
+# # Example usage
+# conversation = Conversation()
+# conversation.add("user", "Hello, how are you?")
+# conversation.add("assistant", "I am doing well, thanks.")
+# # print(conversation.to_json())
+# print(type(conversation.to_dict()))
+# # print(conversation.to_yaml())
diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py
index 36b7a81c..eec2288c 100644
--- a/swarms/structs/multi_agent_exec.py
+++ b/swarms/structs/multi_agent_exec.py
@@ -137,15 +137,18 @@ def run_agents_concurrently_multiprocess(
return results
+
@profile_func
-def run_agents_sequentially(agents: List[AgentType], task: str) -> List[Any]:
+def run_agents_sequentially(
+ agents: List[AgentType], task: str
+) -> List[Any]:
"""
Run multiple agents sequentially for baseline comparison.
-
+
Args:
agents: List of Agent instances to run
task: Task string to execute
-
+
Returns:
List of outputs from each agent
"""
@@ -154,44 +157,52 @@ def run_agents_sequentially(agents: List[AgentType], task: str) -> List[Any]:
@profile_func
def run_agents_with_different_tasks(
- agent_task_pairs: List[tuple[AgentType, str]],
+ agent_task_pairs: List[tuple[AgentType, str]],
batch_size: int = None,
- max_workers: int = None
+ max_workers: int = None,
) -> List[Any]:
"""
Run multiple agents with different tasks concurrently.
-
+
Args:
agent_task_pairs: List of (agent, task) tuples
batch_size: Number of agents to run in parallel
max_workers: Maximum number of threads
-
+
Returns:
List of outputs from each agent
"""
- async def run_pair_async(pair: tuple[AgentType, str], executor: ThreadPoolExecutor) -> Any:
+
+ async def run_pair_async(
+ pair: tuple[AgentType, str], executor: ThreadPoolExecutor
+ ) -> Any:
agent, task = pair
return await run_agent_async(agent, task, executor)
-
+
cpu_cores = cpu_count()
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
results = []
-
+
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
-
+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for i in range(0, len(agent_task_pairs), batch_size):
batch = agent_task_pairs[i : i + batch_size]
batch_results = loop.run_until_complete(
- asyncio.gather(*(run_pair_async(pair, executor) for pair in batch))
+ asyncio.gather(
+ *(
+ run_pair_async(pair, executor)
+ for pair in batch
+ )
+ )
)
results.extend(batch_results)
-
+
return results
@@ -199,46 +210,46 @@ async def run_agent_with_timeout(
agent: AgentType,
task: str,
timeout: float,
- executor: ThreadPoolExecutor
+ executor: ThreadPoolExecutor,
) -> Any:
"""
Run an agent with a timeout limit.
-
+
Args:
agent: Agent instance to run
task: Task string to execute
timeout: Timeout in seconds
executor: ThreadPoolExecutor instance
-
+
Returns:
Agent execution result or None if timeout occurs
"""
try:
return await asyncio.wait_for(
- run_agent_async(agent, task, executor),
- timeout=timeout
+ run_agent_async(agent, task, executor), timeout=timeout
)
except asyncio.TimeoutError:
return None
+
@profile_func
def run_agents_with_timeout(
agents: List[AgentType],
task: str,
timeout: float,
batch_size: int = None,
- max_workers: int = None
+ max_workers: int = None,
) -> List[Any]:
"""
Run multiple agents concurrently with a timeout for each agent.
-
+
Args:
agents: List of Agent instances
task: Task string to execute
timeout: Timeout in seconds for each agent
batch_size: Number of agents to run in parallel
max_workers: Maximum number of threads
-
+
Returns:
List of outputs (None for timed out agents)
"""
@@ -246,28 +257,29 @@ def run_agents_with_timeout(
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
results = []
-
+
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
-
+
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for i in range(0, len(agents), batch_size):
batch = agents[i : i + batch_size]
batch_results = loop.run_until_complete(
asyncio.gather(
- *(run_agent_with_timeout(agent, task, timeout, executor)
- for agent in batch)
+ *(
+ run_agent_with_timeout(
+ agent, task, timeout, executor
+ )
+ for agent in batch
+ )
)
)
results.extend(batch_results)
-
- return results
-
-
+ return results
@dataclass
@@ -276,45 +288,52 @@ class ResourceMetrics:
memory_percent: float
active_threads: int
+
def get_system_metrics() -> ResourceMetrics:
"""Get current system resource usage"""
return ResourceMetrics(
cpu_percent=psutil.cpu_percent(),
memory_percent=psutil.virtual_memory().percent,
- active_threads=threading.active_count()
+ active_threads=threading.active_count(),
)
+
@profile_func
def run_agents_with_resource_monitoring(
agents: List[AgentType],
task: str,
cpu_threshold: float = 90.0,
memory_threshold: float = 90.0,
- check_interval: float = 1.0
+ check_interval: float = 1.0,
) -> List[Any]:
"""
Run agents with system resource monitoring and adaptive batch sizing.
-
+
Args:
agents: List of Agent instances
task: Task string to execute
cpu_threshold: Max CPU usage percentage
memory_threshold: Max memory usage percentage
check_interval: Resource check interval in seconds
-
+
Returns:
List of outputs from each agent
"""
+
async def monitor_resources():
while True:
metrics = get_system_metrics()
- if metrics.cpu_percent > cpu_threshold or metrics.memory_percent > memory_threshold:
+ if (
+ metrics.cpu_percent > cpu_threshold
+ or metrics.memory_percent > memory_threshold
+ ):
# Reduce batch size or pause execution
pass
await asyncio.sleep(check_interval)
-
+
# Implementation details...
-
+
+
# # Example usage:
# # Initialize your agents with the same model to avoid re-creating it
# agents = [
@@ -341,4 +360,3 @@ def run_agents_with_resource_monitoring(
# for i, output in enumerate(outputs):
# print(f"Output from agent {i+1}:\n{output}")
-
diff --git a/swarms/tools/tool_parse_exec.py b/swarms/tools/tool_parse_exec.py
index 15f04d07..8686781a 100644
--- a/swarms/tools/tool_parse_exec.py
+++ b/swarms/tools/tool_parse_exec.py
@@ -1,15 +1,17 @@
import json
-from typing import List
+from typing import List, Any, Callable
from swarms.utils.loguru_logger import logger
from swarms.utils.parse_code import extract_code_from_markdown
def parse_and_execute_json(
- functions: List[callable] = None,
- json_string: str = None,
+ functions: List[Callable[..., Any]],
+ json_string: str,
parse_md: bool = False,
-):
+ verbose: bool = False,
+ return_str: bool = True,
+) -> dict:
"""
Parses and executes a JSON string containing function names and parameters.
@@ -17,191 +19,104 @@ def parse_and_execute_json(
functions (List[callable]): A list of callable functions.
json_string (str): The JSON string to parse and execute.
parse_md (bool): Flag indicating whether to extract code from Markdown.
-
+ verbose (bool): Flag indicating whether to enable verbose logging.
+ return_str (bool): Flag indicating whether to return a JSON string.
Returns:
- A dictionary containing the results of executing the functions with the parsed parameters.
-
+ dict: A dictionary containing the results of executing the functions with the parsed parameters.
"""
+ if not functions or not json_string:
+ raise ValueError("Functions and JSON string are required")
+
if parse_md:
json_string = extract_code_from_markdown(json_string)
try:
- # Create a dictionary that maps function names to functions
+ # Create function name to function mapping
function_dict = {func.__name__: func for func in functions}
+ if verbose:
+ logger.info(
+ f"Available functions: {list(function_dict.keys())}"
+ )
+ logger.info(f"Processing JSON: {json_string}")
+
+ # Parse JSON data
data = json.loads(json_string)
- function_list = (
- data.get("functions", [])
- if data.get("functions")
- else [data.get("function", [])]
- )
+
+ # Handle both single function and function list formats
+ function_list = []
+ if "functions" in data:
+ function_list = data["functions"]
+ elif "function" in data:
+ function_list = [data["function"]]
+ else:
+ function_list = [
+ data
+ ] # Assume entire object is single function
+
+ # Ensure function_list is a list and filter None values
+ if isinstance(function_list, dict):
+ function_list = [function_list]
+ function_list = [f for f in function_list if f]
+
+ if verbose:
+ logger.info(f"Processing {len(function_list)} functions")
results = {}
for function_data in function_list:
function_name = function_data.get("name")
- parameters = function_data.get("parameters")
+ parameters = function_data.get("parameters", {})
- # Check if the function name is in the function dictionary
- if function_name in function_dict:
- # Call the function with the parsed parameters
- result = function_dict[function_name](**parameters)
- results[function_name] = str(result)
- else:
+ if not function_name:
+ logger.warning("Function data missing name field")
+ continue
+
+ if verbose:
+ logger.info(
+ f"Executing {function_name} with params: {parameters}"
+ )
+
+ if function_name not in function_dict:
+ logger.warning(f"Function {function_name} not found")
results[function_name] = None
+ continue
- return results
+ try:
+ result = function_dict[function_name](**parameters)
+ results[function_name] = str(result)
+ if verbose:
+ logger.info(
+ f"Result for {function_name}: {result}"
+ )
+ except Exception as e:
+ logger.error(
+ f"Error executing {function_name}: {str(e)}"
+ )
+ results[function_name] = f"Error: {str(e)}"
+
+ # Format final results
+ if len(results) == 1:
+ # Return single result directly
+ data = {"result": next(iter(results.values()))}
+ else:
+ # Return all results
+ data = {
+ "results": results,
+ "summary": "\n".join(
+ f"{k}: {v}" for k, v in results.items()
+ ),
+ }
+
+ if return_str:
+ return json.dumps(data)
+ else:
+ return data
+
+ except json.JSONDecodeError as e:
+ error = f"Invalid JSON format: {str(e)}"
+ logger.error(error)
+ return {"error": error}
except Exception as e:
- logger.error(f"Error parsing and executing JSON: {e}")
- return None
-
-
-# def parse_and_execute_json(
-# functions: List[Callable[..., Any]],
-# json_string: str = None,
-# parse_md: bool = False,
-# verbose: bool = False,
-# ) -> Dict[str, Any]:
-# """
-# Parses and executes a JSON string containing function names and parameters.
-
-# Args:
-# functions (List[Callable]): A list of callable functions.
-# json_string (str): The JSON string to parse and execute.
-# parse_md (bool): Flag indicating whether to extract code from Markdown.
-# verbose (bool): Flag indicating whether to enable verbose logging.
-
-# Returns:
-# Dict[str, Any]: A dictionary containing the results of executing the functions with the parsed parameters.
-# """
-# if parse_md:
-# json_string = extract_code_from_markdown(json_string)
-
-# logger.info("Number of functions: " + str(len(functions)))
-
-# try:
-# # Create a dictionary that maps function names to functions
-# function_dict = {func.__name__: func for func in functions}
-
-# data = json.loads(json_string)
-# function_list = data.get("functions") or [data.get("function")]
-
-# # Ensure function_list is a list and filter out None values
-# if isinstance(function_list, dict):
-# function_list = [function_list]
-# else:
-# function_list = [f for f in function_list if f]
-
-# results = {}
-
-# # Determine if concurrency is needed
-# concurrency = len(function_list) > 1
-
-# if concurrency:
-# with concurrent.futures.ThreadPoolExecutor() as executor:
-# future_to_function = {
-# executor.submit(
-# execute_and_log_function,
-# function_dict,
-# function_data,
-# verbose,
-# ): function_data
-# for function_data in function_list
-# }
-# for future in concurrent.futures.as_completed(
-# future_to_function
-# ):
-# function_data = future_to_function[future]
-# try:
-# result = future.result()
-# results.update(result)
-# except Exception as e:
-# if verbose:
-# logger.error(
-# f"Error executing function {function_data.get('name')}: {e}"
-# )
-# results[function_data.get("name")] = None
-# else:
-# for function_data in function_list:
-# function_name = function_data.get("name")
-# parameters = function_data.get("parameters")
-
-# if verbose:
-# logger.info(
-# f"Executing function: {function_name} with parameters: {parameters}"
-# )
-
-# if function_name in function_dict:
-# try:
-# result = function_dict[function_name](**parameters)
-# results[function_name] = str(result)
-# if verbose:
-# logger.info(
-# f"Result for function {function_name}: {result}"
-# )
-# except Exception as e:
-# if verbose:
-# logger.error(
-# f"Error executing function {function_name}: {e}"
-# )
-# results[function_name] = None
-# else:
-# if verbose:
-# logger.warning(
-# f"Function {function_name} not found."
-# )
-# results[function_name] = None
-
-# # Merge all results into a single string
-# merged_results = "\n".join(
-# f"{key}: {value}" for key, value in results.items()
-# )
-
-# return {"merged_results": merged_results}
-# except Exception as e:
-# logger.error(f"Error parsing and executing JSON: {e}")
-# return None
-
-
-# def execute_and_log_function(
-# function_dict: Dict[str, Callable],
-# function_data: Dict[str, Any],
-# verbose: bool,
-# ) -> Dict[str, Any]:
-# """
-# Executes a function from a given dictionary of functions and logs the execution details.
-
-# Args:
-# function_dict (Dict[str, Callable]): A dictionary containing the available functions.
-# function_data (Dict[str, Any]): A dictionary containing the function name and parameters.
-# verbose (bool): A flag indicating whether to log the execution details.
-
-# Returns:
-# Dict[str, Any]: A dictionary containing the function name and its result.
-
-# """
-# function_name = function_data.get("name")
-# parameters = function_data.get("parameters")
-
-# if verbose:
-# logger.info(
-# f"Executing function: {function_name} with parameters: {parameters}"
-# )
-
-# if function_name in function_dict:
-# try:
-# result = function_dict[function_name](**parameters)
-# if verbose:
-# logger.info(
-# f"Result for function {function_name}: {result}"
-# )
-# return {function_name: str(result)}
-# except Exception as e:
-# if verbose:
-# logger.error(
-# f"Error executing function {function_name}: {e}"
-# )
-# return {function_name: None}
-# else:
-# if verbose:
-# logger.warning(f"Function {function_name} not found.")
-# return {function_name: None}
+ error = f"Error parsing and executing JSON: {str(e)}"
+ logger.error(error)
+ return {"error": error}
diff --git a/tests/agents/test_agent_logging.py b/tests/agents/test_agent_logging.py
index b2106dd6..1439935e 100644
--- a/tests/agents/test_agent_logging.py
+++ b/tests/agents/test_agent_logging.py
@@ -1,9 +1,5 @@
-from unittest.mock import Mock, MagicMock
-from dataclasses import dataclass, field, asdict
-from typing import List, Dict, Any
-from datetime import datetime
+from unittest.mock import MagicMock
import unittest
-from swarms.schemas.agent_step_schemas import ManySteps, Step
from swarms.structs.agent import Agent
from swarms.tools.tool_parse_exec import parse_and_execute_json
@@ -12,61 +8,80 @@ parse_and_execute_json = MagicMock()
parse_and_execute_json.return_value = {
"tool_name": "calculator",
"args": {"numbers": [2, 2]},
- "output": "4"
+ "output": "4",
}
+
class TestAgentLogging(unittest.TestCase):
def setUp(self):
self.mock_tokenizer = MagicMock()
self.mock_tokenizer.count_tokens.return_value = 100
-
+
self.mock_short_memory = MagicMock()
- self.mock_short_memory.get_memory_stats.return_value = {"message_count": 2}
-
+ self.mock_short_memory.get_memory_stats.return_value = {
+ "message_count": 2
+ }
+
self.mock_long_memory = MagicMock()
- self.mock_long_memory.get_memory_stats.return_value = {"item_count": 5}
-
+ self.mock_long_memory.get_memory_stats.return_value = {
+ "item_count": 5
+ }
+
self.agent = Agent(
tokenizer=self.mock_tokenizer,
short_memory=self.mock_short_memory,
- long_term_memory=self.mock_long_memory
+ long_term_memory=self.mock_long_memory,
)
def test_log_step_metadata_basic(self):
- log_result = self.agent.log_step_metadata(1, "Test prompt", "Test response")
-
- self.assertIn('step_id', log_result)
- self.assertIn('timestamp', log_result)
- self.assertIn('tokens', log_result)
- self.assertIn('memory_usage', log_result)
-
- self.assertEqual(log_result['tokens']['total'], 200)
+ log_result = self.agent.log_step_metadata(
+ 1, "Test prompt", "Test response"
+ )
+
+ self.assertIn("step_id", log_result)
+ self.assertIn("timestamp", log_result)
+ self.assertIn("tokens", log_result)
+ self.assertIn("memory_usage", log_result)
+
+ self.assertEqual(log_result["tokens"]["total"], 200)
def test_log_step_metadata_no_long_term_memory(self):
self.agent.long_term_memory = None
- log_result = self.agent.log_step_metadata(1, "prompt", "response")
- self.assertEqual(log_result['memory_usage']['long_term'], {})
-
+ log_result = self.agent.log_step_metadata(
+ 1, "prompt", "response"
+ )
+ self.assertEqual(log_result["memory_usage"]["long_term"], {})
+
def test_log_step_metadata_timestamp(self):
- log_result = self.agent.log_step_metadata(1, "prompt", "response")
- self.assertIn('timestamp', log_result)
+ log_result = self.agent.log_step_metadata(
+ 1, "prompt", "response"
+ )
+ self.assertIn("timestamp", log_result)
def test_token_counting_integration(self):
self.mock_tokenizer.count_tokens.side_effect = [150, 250]
- log_result = self.agent.log_step_metadata(1, "prompt", "response")
-
- self.assertEqual(log_result['tokens']['total'], 400)
+ log_result = self.agent.log_step_metadata(
+ 1, "prompt", "response"
+ )
+
+ self.assertEqual(log_result["tokens"]["total"], 400)
def test_agent_output_updating(self):
- initial_total_tokens = sum(step['tokens']['total'] for step in self.agent.agent_output.steps)
- self.agent.log_step_metadata(1, "prompt", "response")
-
- final_total_tokens = sum(step['tokens']['total'] for step in self.agent.agent_output.steps)
- self.assertEqual(
- final_total_tokens - initial_total_tokens,
- 200
- )
- self.assertEqual(len(self.agent.agent_output.steps), 1)
+ initial_total_tokens = sum(
+ step["tokens"]["total"]
+ for step in self.agent.agent_output.steps
+ )
+ self.agent.log_step_metadata(1, "prompt", "response")
+
+ final_total_tokens = sum(
+ step["tokens"]["total"]
+ for step in self.agent.agent_output.steps
+ )
+ self.assertEqual(
+ final_total_tokens - initial_total_tokens, 200
+ )
+ self.assertEqual(len(self.agent.agent_output.steps), 1)
+
class TestAgentLoggingIntegration(unittest.TestCase):
def setUp(self):
@@ -75,24 +90,25 @@ class TestAgentLoggingIntegration(unittest.TestCase):
def test_full_logging_cycle(self):
task = "Test task"
max_loops = 1
-
+
result = self.agent._run(task, max_loops=max_loops)
-
+
self.assertIsInstance(result, dict)
- self.assertIn('steps', result)
- self.assertIsInstance(result['steps'], list)
- self.assertEqual(len(result['steps']), max_loops)
-
- if result['steps']:
- step = result['steps'][0]
- self.assertIn('step_id', step)
- self.assertIn('timestamp', step)
- self.assertIn('task', step)
- self.assertIn('response', step)
- self.assertEqual(step['task'], task)
- self.assertEqual(step['response'], f"Response for loop 1")
-
+ self.assertIn("steps", result)
+ self.assertIsInstance(result["steps"], list)
+ self.assertEqual(len(result["steps"]), max_loops)
+
+ if result["steps"]:
+ step = result["steps"][0]
+ self.assertIn("step_id", step)
+ self.assertIn("timestamp", step)
+ self.assertIn("task", step)
+ self.assertIn("response", step)
+ self.assertEqual(step["task"], task)
+ self.assertEqual(step["response"], "Response for loop 1")
+
self.assertTrue(len(self.agent.agent_output.steps) > 0)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
unittest.main()
diff --git a/tests/structs/test_agent.py b/tests/structs/test_agent.py
index 4a145029..1661e354 100644
--- a/tests/structs/test_agent.py
+++ b/tests/structs/test_agent.py
@@ -541,7 +541,7 @@ def test_flow_load_state(flow_instance):
"max_loops": 10,
"autosave_path": "/path/to/load",
}
- flow_instance.load_state(state)
+ flow_instance.load(state)
assert flow_instance.get_current_prompt() == "Loaded prompt"
assert "Step 1" in flow_instance.get_instructions()
assert "User message 1" in flow_instance.get_user_messages()