From d73f1a68c432fd4a960d4545fcb9e0632b7b66c2 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 1 Nov 2024 11:07:18 -0400 Subject: [PATCH] [FEAT][Better RAG][OUTPUT JSON] --- changelog.md | 1 - docs/mkdocs.yml | 1 + docs/swarms/changelog/changelog_new.md | 90 +++ docs/swarms/structs/agent.md | 7 +- docs/swarms/structs/agent_docs_v1.md | 2 +- example.py | 9 +- new_prompt.py | 30 - pyproject.toml | 2 +- swarms/schemas/agent_step_schemas.py | 3 +- swarms/structs/__init__.py | 1 - swarms/structs/agent.py | 747 ++++++++++++------------- swarms/structs/conversation.py | 29 +- swarms/structs/multi_agent_exec.py | 92 +-- swarms/tools/tool_parse_exec.py | 265 +++------ tests/agents/test_agent_logging.py | 122 ++-- tests/structs/test_agent.py | 2 +- 16 files changed, 686 insertions(+), 717 deletions(-) delete mode 100644 changelog.md create mode 100644 docs/swarms/changelog/changelog_new.md delete mode 100644 new_prompt.py diff --git a/changelog.md b/changelog.md deleted file mode 100644 index 4cd80326..00000000 --- a/changelog.md +++ /dev/null @@ -1 +0,0 @@ -# 5.8.7 \ No newline at end of file diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index f8c20923..4873ad7f 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -192,6 +192,7 @@ nav: - Changelog: - Swarms 5.6.8: "swarms/changelog/5_6_8.md" - Swarms 5.8.1: "swarms/changelog/5_8_1.md" + - Swarms 5.9.2: "swarms/changelog/changelog_new.md" - Swarm Models: - Overview: "swarms/models/index.md" # - Models Available: "swarms/models/index.md" diff --git a/docs/swarms/changelog/changelog_new.md b/docs/swarms/changelog/changelog_new.md new file mode 100644 index 00000000..adc92b02 --- /dev/null +++ b/docs/swarms/changelog/changelog_new.md @@ -0,0 +1,90 @@ +# 🚀 Swarms 5.9.2 Release Notes + + +### 🎯 Major Features + +#### Concurrent Agent Execution Suite +We're excited to introduce a comprehensive suite of agent execution methods to supercharge your multi-agent workflows: + +- `run_agents_concurrently`: Execute multiple agents in parallel with optimal resource utilization +- `run_agents_concurrently_async`: Asynchronous execution for improved performance +- `run_single_agent`: Streamlined single agent execution +- `run_agents_concurrently_multiprocess`: Multi-process execution for CPU-intensive tasks +- `run_agents_sequentially`: Sequential execution with controlled flow +- `run_agents_with_different_tasks`: Assign different tasks to different agents +- `run_agent_with_timeout`: Time-bounded agent execution +- `run_agents_with_resource_monitoring`: Monitor and manage resource usage + +### 📚 Documentation +- Comprehensive documentation added for all new execution methods +- Updated examples and usage patterns +- Enhanced API reference + +### 🛠️ Improvements +- Tree swarm implementation fixes +- Workspace directory now automatically set to `agent_workspace` +- Improved error handling and stability + +## Quick Start + +```python +from swarms import Agent, run_agents_concurrently, run_agents_with_timeout, run_agents_with_different_tasks + +# Initialize multiple agents +agents = [ + Agent( + agent_name=f"Analysis-Agent-{i}", + system_prompt="You are a financial analysis expert", + llm=model, + max_loops=1 + ) + for i in range(5) +] + +# Run agents concurrently +task = "Analyze the impact of rising interest rates on tech stocks" +outputs = run_agents_concurrently(agents, task) + +# Example with timeout +outputs_with_timeout = run_agents_with_timeout( + agents=agents, + task=task, + timeout=30.0, + batch_size=2 +) + +# Run different tasks +task_pairs = [ + (agents[0], "Analyze tech stocks"), + (agents[1], "Analyze energy stocks"), + (agents[2], "Analyze retail stocks") +] +different_outputs = run_agents_with_different_tasks(task_pairs) +``` + +## Installation +```bash +pip3 install -U swarms +``` + +## Coming Soon +- 🌟 Auto Swarm Builder: Automatically construct and configure entire swarms from a single task specification (in development) +- Auto Prompt Generator for thousands of agents (in development) + +## Community +We believe in the power of community-driven development. Help us make Swarms better! + +- ⭐ Star our repository: https://github.com/kyegomez/swarms +- 🔄 Fork the project and contribute your improvements +- 🤝 Join our growing community of contributors + +## Bug Fixes +- Fixed Tree Swarm implementation issues +- Resolved workspace directory configuration problems +- General stability improvements + +--- + +For detailed documentation and examples, visit our [GitHub repository](https://github.com/kyegomez/swarms). + +Let's build the future of multi-agent systems together! 🚀 \ No newline at end of file diff --git a/docs/swarms/structs/agent.md b/docs/swarms/structs/agent.md index 915ff873..97ab465b 100644 --- a/docs/swarms/structs/agent.md +++ b/docs/swarms/structs/agent.md @@ -156,7 +156,6 @@ graph TD | `save_to_yaml(file_path)` | Saves the agent to a YAML file. | `file_path` (str): Path to save the YAML file. | `agent.save_to_yaml("agent_config.yaml")` | | `get_llm_parameters()` | Returns the parameters of the language model. | None | `llm_params = agent.get_llm_parameters()` | | `save_state(file_path, *args, **kwargs)` | Saves the current state of the agent to a JSON file. | `file_path` (str): Path to save the JSON file.
`*args`, `**kwargs`: Additional arguments. | `agent.save_state("agent_state.json")` | -| `load_state(file_path)` | Loads the state of the agent from a JSON file. | `file_path` (str): Path to the JSON file. | `agent.load_state("agent_state.json")` | | `update_system_prompt(system_prompt)` | Updates the system prompt. | `system_prompt` (str): New system prompt. | `agent.update_system_prompt("New system instructions")` | | `update_max_loops(max_loops)` | Updates the maximum number of loops. | `max_loops` (int): New maximum number of loops. | `agent.update_max_loops(5)` | | `update_loop_interval(loop_interval)` | Updates the loop interval. | `loop_interval` (int): New loop interval. | `agent.update_loop_interval(2)` | @@ -184,11 +183,9 @@ graph TD | `check_available_tokens()` | Checks and returns the number of available tokens. | None | `available_tokens = agent.check_available_tokens()` | | `tokens_checks()` | Performs token checks and returns available tokens. | None | `token_info = agent.tokens_checks()` | | `truncate_string_by_tokens(input_string, limit)` | Truncates a string to fit within a token limit. | `input_string` (str): String to truncate.
`limit` (int): Token limit. | `truncated_string = agent.truncate_string_by_tokens("Long string", 100)` | -| `if_tokens_exceeds_context_length()` | Checks if the number of tokens exceeds the context length. | None | `exceeds = agent.if_tokens_exceeds_context_length()` | | `tokens_operations(input_string)` | Performs various token-related operations on the input string. | `input_string` (str): String to process. | `processed_string = agent.tokens_operations("Input string")` | | `parse_function_call_and_execute(response)` | Parses a function call from the response and executes it. | `response` (str): Response containing the function call. | `result = agent.parse_function_call_and_execute(response)` | | `activate_agentops()` | Activates AgentOps functionality. | None | `agent.activate_agentops()` | -| `count_tokens_and_subtract_from_context_window(response, *args, **kwargs)` | Counts tokens in the response and adjusts the context window. | `response` (str): Response to process.
`*args`, `**kwargs`: Additional arguments. | `await agent.count_tokens_and_subtract_from_context_window(response)` | | `llm_output_parser(response)` | Parses the output from the language model. | `response` (Any): Response from the LLM. | `parsed_response = agent.llm_output_parser(llm_output)` | | `log_step_metadata(loop, task, response)` | Logs metadata for each step of the agent's execution. | `loop` (int): Current loop number.
`task` (str): Current task.
`response` (str): Agent's response. | `agent.log_step_metadata(1, "Analyze data", "Analysis complete")` | | `to_dict()` | Converts the agent's attributes to a dictionary. | None | `agent_dict = agent.to_dict()` | @@ -391,7 +388,7 @@ agent.save_state('saved_flow.json') # Load the agent state agent = Agent(llm=llm_instance, max_loops=5) -agent.load_state('saved_flow.json') +agent.load('saved_flow.json') agent.run("Continue with the task") ``` @@ -537,7 +534,7 @@ print(agent.system_prompt) 4. Leverage `long_term_memory` for tasks that require persistent information. 5. Use `interactive` mode for real-time conversations and `dashboard` for monitoring. 6. Implement `sentiment_analysis` for applications requiring tone management. -7. Utilize `autosave` and `save_state`/`load_state` methods for continuity across sessions. +7. Utilize `autosave` and `save`/`load` methods for continuity across sessions. 8. Optimize token usage with `dynamic_context_window` and `tokens_checks` methods. 9. Use `concurrent` and `async` methods for performance-critical applications. 10. Regularly review and analyze feedback using the `analyze_feedback` method. diff --git a/docs/swarms/structs/agent_docs_v1.md b/docs/swarms/structs/agent_docs_v1.md index 4bc5a5c9..8cba120d 100644 --- a/docs/swarms/structs/agent_docs_v1.md +++ b/docs/swarms/structs/agent_docs_v1.md @@ -455,7 +455,7 @@ agent.save_state('saved_flow.json') # Load the agent state agent = Agent(llm=llm_instance, max_loops=5) -agent.load_state('saved_flow.json') +agent.load('saved_flow.json') agent.run("Continue with the task") ``` diff --git a/example.py b/example.py index e53c08b0..9ae74915 100644 --- a/example.py +++ b/example.py @@ -2,9 +2,6 @@ import os from swarms import Agent from swarm_models import OpenAIChat -from swarms.prompts.finance_agent_sys_prompt import ( - FINANCIAL_AGENT_SYS_PROMPT, -) from dotenv import load_dotenv load_dotenv() @@ -20,9 +17,9 @@ model = OpenAIChat( # Initialize the agent agent = Agent( agent_name="Financial-Analysis-Agent", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + # system_prompt=FINANCIAL_AGENT_SYS_PROMPT, llm=model, - max_loops=1, + max_loops=3, autosave=True, dashboard=False, verbose=True, @@ -31,7 +28,7 @@ agent = Agent( user_name="swarms_corp", retry_attempts=1, context_length=200000, - return_step_meta=False, + return_step_meta=True, # output_type="json", output_type="json", # "json", "dict", "csv" OR "string" soon "yaml" and streaming_on=False, diff --git a/new_prompt.py b/new_prompt.py deleted file mode 100644 index 9375c8f3..00000000 --- a/new_prompt.py +++ /dev/null @@ -1,30 +0,0 @@ -from swarms import Prompt -from swarm_models import OpenAIChat -import os - -model = OpenAIChat( - api_key=os.getenv("OPENAI_API_KEY"), - model_name="gpt-4o-mini", - temperature=0.1, -) - -# Aggregator system prompt -prompt_generator_sys_prompt = Prompt( - name="prompt-generator-sys-prompt-o1", - description="Generate the most reliable prompt for a specific problem", - content=""" - Your purpose is to craft extremely reliable and production-grade system prompts for other agents. - - # Instructions - - Understand the prompt required for the agent. - - Utilize a combination of the most effective prompting strategies available, including chain of thought, many shot, few shot, and instructions-examples-constraints. - - Craft the prompt by blending the most suitable prompting strategies. - - Ensure the prompt is production-grade ready and educates the agent on how to reason and why to reason in that manner. - - Provide constraints if necessary and as needed. - - The system prompt should be extensive and cover a vast array of potential scenarios to specialize the agent. - """, - auto_generate_prompt=True, - llm=model, -) - -# print(prompt_generator_sys_prompt.get_prompt()) diff --git a/pyproject.toml b/pyproject.toml index e027bd9c..cf823aa8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "5.9.2" +version = "6.0.0" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/schemas/agent_step_schemas.py b/swarms/schemas/agent_step_schemas.py index f78ee485..a6186229 100644 --- a/swarms/schemas/agent_step_schemas.py +++ b/swarms/schemas/agent_step_schemas.py @@ -8,6 +8,7 @@ from pydantic import BaseModel, Field from swarms.schemas.base_schemas import ( AgentChatCompletionResponse, ) +from typing import Union def get_current_time(): @@ -56,7 +57,7 @@ class ManySteps(BaseModel): description="The ID of the task this step belongs to.", examples=["50da533e-3904-4401-8a07-c49adf88b5eb"], ) - steps: Optional[List[Step]] = Field( + steps: Optional[List[Union[Step, Any]]] = Field( [], description="The steps of the task.", ) diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index b8915583..09b54c7a 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -76,7 +76,6 @@ from swarms.structs.multi_agent_exec import ( run_agents_with_different_tasks, run_agent_with_timeout, run_agents_with_resource_monitoring, - ) __all__ = [ diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 4a467095..e70ac0a1 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -21,12 +21,16 @@ from typing import ( import toml import yaml +from clusterops import ( + execute_on_gpu, + execute_with_cpu_cores, +) from loguru import logger from pydantic import BaseModel -from swarms_memory import BaseVectorDatabase +from swarm_models.tiktoken_wrapper import TikTokenizer from termcolor import colored -from swarm_models.tiktoken_wrapper import TikTokenizer +from swarms.agents.ape_agent import auto_generate_prompt from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3 from swarms.prompts.multi_modal_autonomous_instruction_prompt import ( MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1, @@ -40,23 +44,14 @@ from swarms.schemas.base_schemas import ( ) from swarms.structs.concat import concat_strings from swarms.structs.conversation import Conversation -from swarms.structs.yaml_model import YamlModel from swarms.tools.base_tool import BaseTool from swarms.tools.func_calling_utils import ( prepare_output_for_output_model, ) -from swarms.tools.prebuilt.code_executor import CodeExecutor from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.utils.data_to_text import data_to_text from swarms.utils.file_processing import create_file_in_folder -from swarms.utils.parse_code import extract_code_from_markdown from swarms.utils.pdf_to_text import pdf_to_text -from swarms.utils.run_on_cpu import run_on_cpu -from clusterops import ( - execute_on_gpu, - execute_with_cpu_cores, -) -from swarms.agents.ape_agent import auto_generate_prompt # Utils @@ -85,7 +80,7 @@ def exists(val): # Agent output types # agent_output_type = Union[BaseModel, dict, str] agent_output_type = Literal[ - "string", "str", "list", "json", "dict", "yaml" + "string", "str", "list", "json", "dict", "yaml", "json_schema" ] ToolUsageType = Union[BaseModel, Dict[str, Any]] @@ -198,11 +193,8 @@ class Agent: interactive_run: Run the agent in interactive mode streamed_generation: Stream the generation of the response save_state: Save the state - load_state: Load the state truncate_history: Truncate the history add_task_to_memory: Add the task to the memory - add_message_to_memory: Add the message to the memory - add_message_to_memory_and_truncate: Add the message to the memory and truncate print_dashboard: Print the dashboard loop_count_print: Print the loop count streaming: Stream the content @@ -260,7 +252,7 @@ class Agent: pdf_path: Optional[str] = None, list_of_pdf: Optional[str] = None, tokenizer: Optional[Any] = None, - long_term_memory: Optional[BaseVectorDatabase] = None, + long_term_memory: Optional[Any] = None, preset_stopping_token: Optional[bool] = False, traceback: Optional[Any] = None, traceback_handlers: Optional[Any] = None, @@ -330,6 +322,8 @@ class Agent: data_memory: Optional[Callable] = None, load_yaml_path: str = None, auto_generate_prompt: bool = False, + rag_every_loop: bool = False, + plan_enabled: bool = False, *args, **kwargs, ): @@ -416,7 +410,6 @@ class Agent: self.log_directory = log_directory self.tool_system_prompt = tool_system_prompt self.max_tokens = max_tokens - self.frequency_penalty = frequency_penalty self.presence_penalty = presence_penalty self.temperature = temperature @@ -436,10 +429,21 @@ class Agent: self.load_yaml_path = load_yaml_path self.tokenizer = TikTokenizer() self.auto_generate_prompt = auto_generate_prompt + self.rag_every_loop = rag_every_loop + self.plan_enabled = plan_enabled + + # Initialize the short term memory + self.short_memory = Conversation( + system_prompt=system_prompt, + time_enabled=True, + user=user_name, + rules=rules, + *args, + **kwargs, + ) # Initialize the feedback self.feedback = [] - self.step_pool = [] # Initialize the executor self.executor = ThreadPoolExecutor( @@ -472,17 +476,6 @@ class Agent: if preset_stopping_token is not None: self.stopping_token = "" - # If the system prompt is provided then set the system prompt - # Initialize the short term memory - self.short_memory = Conversation( - system_prompt=system_prompt, - time_enabled=True, - user=user_name, - rules=rules, - *args, - **kwargs, - ) - # # Check the parameters # # Telemetry Processor to log agent data # threading.Thread(target=self.agent_initialization()).start @@ -561,10 +554,10 @@ class Agent: # run_id=run_id, task="", max_loops=self.max_loops, - steps=self.step_pool, - full_history=self.short_memory.return_history_as_string(), + steps=self.short_memory.to_dict(), + full_history=self.short_memory.get_str(), total_tokens=self.tokenizer.count_tokens( - self.short_memory.return_history_as_string() + self.short_memory.get_str() ), stopping_token=self.stopping_token, interactive=self.interactive, @@ -576,39 +569,48 @@ class Agent: def check_if_no_prompt_then_autogenerate(self, task: str = None): """ - Checks if a system prompt is not set and auto_generate_prompt is enabled. If so, it auto-generates a prompt based on the agent's name, description, or the task if both are missing. + Checks if auto_generate_prompt is enabled and generates a prompt by combining agent name, description and system prompt if available. + Falls back to task if all other fields are missing. Args: - task (str, optional): The task to use as a fallback if both name and description are missing. Defaults to None. + task (str, optional): The task to use as a fallback if name, description and system prompt are missing. Defaults to None. """ - if ( - self.system_prompt is None - and self.auto_generate_prompt is True - ): - if self.description: - prompt = auto_generate_prompt( - self.description, self.llm - ) - elif self.agent_name: - logger.info( - "Description is missing. Using agent name as a fallback." + if self.auto_generate_prompt is True: + # Collect all available prompt components + components = [] + + if self.agent_name: + components.append(self.agent_name) + + if self.agent_description: + components.append(self.agent_description) + + if self.system_prompt: + components.append(self.system_prompt) + + # If no components available, fall back to task + if not components and task: + logger.warning( + "No agent details found. Using task as fallback for prompt generation." ) - prompt = auto_generate_prompt( - self.agent_name, self.llm + self.system_prompt = auto_generate_prompt( + task, self.llm ) else: - logger.warning( - "Both description and agent name are missing. Using task as a fallback." + # Combine all available components + combined_prompt = " ".join(components) + logger.info( + f"Auto-generating prompt from: {', '.join(components)}" ) - prompt = auto_generate_prompt(task, self.llm) - self.system_prompt = prompt - logger.info("Auto-generated prompt successfully.") - else: - if self.system_prompt is not None: self.system_prompt = auto_generate_prompt( - self.system_prompt, self.llm + combined_prompt, self.llm + ) + self.short_memory.add( + role="system", content=self.system_prompt ) + logger.info("Auto-generated prompt successfully.") + def set_system_prompt(self, system_prompt: str): """Set the system prompt""" self.system_prompt = system_prompt @@ -674,29 +676,6 @@ class Agent: ) ) - def format_prompt(self, template, **kwargs: Any) -> str: - """Format the template with the provided kwargs using f-string interpolation.""" - return template.format(**kwargs) - - def add_message_to_memory(self, message: str, *args, **kwargs): - """Add the message to the memory""" - try: - logger.info(f"Adding message to memory: {message}") - self.short_memory.add( - role=self.agent_name, content=message, *args, **kwargs - ) - except Exception as error: - print( - colored( - f"Error adding message to memory: {error}", "red" - ) - ) - - # def add_message_to_memory_and_truncate(self, message: str): - # """Add the message to the memory and truncate""" - # self.short_memory[-1].append(message) - # self.truncate_history() - def print_dashboard(self): """Print dashboard""" print(colored("Initializing Agent Dashboard...", "yellow")) @@ -725,7 +704,9 @@ class Agent: ) ) - def loop_count_print(self, loop_count, max_loops): + def loop_count_print( + self, loop_count: int, max_loops: int + ) -> None: """loop_count_print summary Args: @@ -735,6 +716,7 @@ class Agent: print(colored(f"\nLoop {loop_count} of {max_loops}", "cyan")) print("\n") + # Check parameters def check_parameters(self): if self.llm is None: raise ValueError("Language model is not provided") @@ -748,8 +730,7 @@ class Agent: if self.context_length == 0: raise ValueError("Context length is not provided") - ########################## FUNCTION CALLING ########################## - @run_on_cpu + # Main function def _run( self, task: Optional[str] = None, @@ -759,7 +740,21 @@ class Agent: **kwargs, ) -> Any: """ - Run the autonomous agent loop + run the agent + + Args: + task (str): The task to be performed. + img (str): The image to be processed. + is_last (bool): Indicates if this is the last task. + + Returns: + Any: The output of the agent. + (string, list, json, dict, yaml) + + Examples: + agent(task="What is the capital of France?") + agent(task="What is the capital of France?", img="path/to/image.jpg") + agent(task="What is the capital of France?", img="path/to/image.jpg", is_last=True) """ try: self.check_if_no_prompt_then_autogenerate(task) @@ -769,20 +764,25 @@ class Agent: # Add task to memory self.short_memory.add(role=self.user_name, content=task) + # Plan + if self.plan_enabled is True: + self.plan(task) + # Set the loop count loop_count = 0 - # Clear the short memory response = None all_responses = [] + # Query the long term memory first for the context + if self.long_term_memory is not None: + self.memory_query(task) + while ( self.max_loops == "auto" or loop_count < self.max_loops ): loop_count += 1 - # Log step start - current_step_id = f"step_{loop_count}_{uuid.uuid4().hex}" self.loop_count_print(loop_count, self.max_loops) print("\n") @@ -800,9 +800,12 @@ class Agent: success = False while attempt < self.retry_attempts and not success: try: - if self.long_term_memory is not None: + if ( + self.long_term_memory is not None + and self.rag_every_loop is True + ): logger.info( - "Querying long term memory..." + "Querying RAG database for context..." ) self.memory_query(task_prompt) @@ -816,8 +819,15 @@ class Agent: *response_args, **kwargs ) - # Log step metadata - step_meta = self.log_step_metadata(loop_count, task_prompt, response) + # Convert to a str if the response is not a str + response = self.llm_output_parser(response) + + # Print + if self.streaming_on is True: + self.stream_response(response) + else: + logger.info(f"Response: {response}") + # Check if response is a dictionary and has 'choices' key if ( isinstance(response, dict) @@ -836,33 +846,14 @@ class Agent: # Check and execute tools if self.tools is not None: - tool_result = self.parse_and_execute_tools(response) - if tool_result: - self.update_tool_usage( - step_meta["step_id"], - tool_result["tool"], - tool_result["args"], - tool_result["response"] - ) - - - # Update agent output history - self.agent_output.full_history = self.short_memory.return_history_as_string() - - # Log the step metadata - logged = self.log_step_metadata( - loop_count, task_prompt, response - ) - logger.info(logged) - - # Convert to a str if the response is not a str - response = self.llm_output_parser(response) - - # Print - if self.streaming_on is True: - self.stream_response(response) - else: - print(response) + self.parse_and_execute_tools(response) + # if tool_result: + # self.update_tool_usage( + # step_meta["step_id"], + # tool_result["tool"], + # tool_result["args"], + # tool_result["response"], + # ) # Add the response to the memory self.short_memory.add( @@ -872,32 +863,7 @@ class Agent: # Add to all responses all_responses.append(response) - # TODO: Implement reliability check - if self.tools is not None: - # self.parse_function_call_and_execute(response) - self.parse_and_execute_tools(response) - # if self.code_interpreter is True: - # # Parse the code and execute - # logger.info("Parsing code and executing...") - # code = extract_code_from_markdown(response) - - # output = self.code_executor.execute(code) - - # # Add to memory - # self.short_memory.add( - # role=self.agent_name, content=output - # ) - - # # Run the llm on the output - # response = self.llm( - # self.short_memory.return_history_as_string() - # ) - - # # Add to all responses - # all_responses.append(response) - # self.short_memory.add( - # role=self.agent_name, content=response - # ) + # # TODO: Implement reliability check if self.evaluator: logger.info("Evaluating response...") @@ -909,19 +875,15 @@ class Agent: f" {evaluated_response}" ) self.short_memory.add( - role=self.agent_name, + role="Evaluator", content=evaluated_response, ) - # all_responses.append(evaluated_response) - # Sentiment analysis if self.sentiment_analyzer: logger.info("Analyzing sentiment...") self.sentiment_analysis_handler(response) - # print(response) - success = True # Mark as successful to exit the retry loop except Exception as e: @@ -938,9 +900,7 @@ class Agent: ) break # Exit the loop if all retry attempts fail - # # Check stopping conditions - # if self.stopping_token in response: - # break + # Check stopping conditions if ( self.stopping_condition is not None and self._check_stopping_condition(response) @@ -978,7 +938,7 @@ class Agent: if self.autosave is True: logger.info("Autosaving agent state.") - self.save_state(self.saved_state_path) + self.save_state() # Apply the cleaner function to the response if self.output_cleaner is not None: @@ -987,8 +947,11 @@ class Agent: logger.info( f"Response after output cleaner: {response}" ) + self.short_memory.add( + role="Output Cleaner", + content=response, + ) - # print(response) if self.agent_ops_on is True and is_last is True: self.check_end_session_agentops() @@ -999,9 +962,21 @@ class Agent: if response is not None ] - # return self.agent_output_type(all_responses) + self.agent_output.steps = self.short_memory.to_dict() + self.agent_output.full_history = ( + self.short_memory.get_str() + ) + self.agent_output.total_tokens = ( + self.tokenizer.count_tokens( + self.short_memory.get_str() + ) + ) + # More flexible output types - if self.output_type == "string": + if ( + self.output_type == "string" + or self.output_type == "str" + ): return concat_strings(all_responses) elif self.output_type == "list": return all_responses @@ -1017,6 +992,10 @@ class Agent: return yaml.safe_dump( self.agent_output.model_dump(), sort_keys=False ) + elif self.return_step_meta is True: + return self.agent_output.model_dump_json(indent=4) + elif self.return_history is True: + return self.short_memory.get_str() else: raise ValueError( f"Invalid output type: {self.output_type}" @@ -1103,9 +1082,6 @@ class Agent: return output.getvalue() def parse_and_execute_tools(self, response: str, *args, **kwargs): - # Extract json from markdown - # response = extract_code_from_markdown(response) - # Try executing the tool if self.execute_tool is not False: try: @@ -1120,59 +1096,19 @@ class Agent: **kwargs, ) - print(f"Tool Output: {out}") + out = str(out) + + logger.info(f"Tool Output: {out}") # Add the output to the memory self.short_memory.add( - role=self.agent_name, + role="Tool Executor", content=out, ) except Exception as error: logger.error(f"Error executing tool: {error}") - print( - colored( - f"Error executing tool: {error}", - "red", - ) - ) - - # def long_term_memory_prompt(self, query: str, *args, **kwargs): - # """ - # Generate the agent long term memory prompt - - # Args: - # system_prompt (str): The system prompt - # history (List[str]): The history of the conversation - - # Returns: - # str: The agent history prompt - # """ - # try: - # logger.info(f"Querying long term memory database for {query}") - # ltr = self.long_term_memory.query(query, *args, **kwargs) - - # # Count the tokens - # logger.info("Couting tokens of retrieved document") - # ltr_count = self.tokenizer.count_tokens(ltr) - # logger.info(f"Retrieved document token count {ltr_count}") - - # if ltr_count > self.memory_chunk_size: - # logger.info( - # f"Truncating memory by {self.memory_chunk_size}" - # ) - # out = self.truncate_string_by_tokens( - # ltr, self.memory_chunk_size - # ) - # logger.info( - # f"Memory truncated by {self.memory_chunk_size}" - # ) - - # # Retrieve only the chunk size of the memory - # return out - # except Exception as error: - # logger.error(f"Error querying long term memory: {error}") - # raise error + raise error def add_memory(self, message: str): """Add a memory to the agent @@ -1188,7 +1124,7 @@ class Agent: role=self.agent_name, content=message ) - def plan(self, task: str, *args, **kwargs): + def plan(self, task: str, *args, **kwargs) -> None: """ Plan the task @@ -1199,10 +1135,13 @@ class Agent: if exists(self.planning_prompt): # Join the plan and the task planning_prompt = f"{self.planning_prompt} {task}" - plan = self.llm(planning_prompt) + plan = self.llm(planning_prompt, *args, **kwargs) + logger.info(f"Plan: {plan}") # Add the plan to the memory - self.short_memory.add(role=self.agent_name, content=plan) + self.short_memory.add( + role=self.agent_name, content=str(plan) + ) return None except Exception as error: @@ -1273,37 +1212,93 @@ class Agent: Args: file_path (_type_): _description_ """ + file_path = ( + f"{self.saved_state_path}.json" + or f"{self.agent_name}.json" + or f"{self.saved_state_path}.json" + ) try: create_file_in_folder( self.workspace_dir, - f"{self.saved_state_path}", - self.to_dict(), + file_path, + self.to_json(), ) - return "Saved agent history" + logger.info(f"Saved agent history to: {file_path}") except Exception as error: - print( - colored(f"Error saving agent history: {error}", "red") - ) + logger.error(f"Error saving agent history: {error}") + raise error - def load(self, file_path: str): + def load(self, file_path: str) -> None: """ - Load the agent history from a file. + Load the agent history from a file, excluding the LLM. Args: file_path (str): The path to the file containing the saved agent history. + + Raises: + FileNotFoundError: If the specified file path does not exist + json.JSONDecodeError: If the file contains invalid JSON + AttributeError: If there are issues setting agent attributes + Exception: For other unexpected errors """ - with open(file_path, "r") as file: - data = json.load(file) + try: + file_path = ( + f"{self.saved_state_path}.json" + or f"{self.agent_name}.json" + or f"{self.saved_state_path}.json" + ) - for key, value in data.items(): - setattr(self, key, value) + if not os.path.exists(file_path): + raise FileNotFoundError( + f"File not found at path: {file_path}" + ) - return "Loaded agent history" + with open(file_path, "r") as file: + try: + data = json.load(file) + except json.JSONDecodeError as e: + logger.error( + f"Invalid JSON in file {file_path}: {str(e)}" + ) + raise + + if not isinstance(data, dict): + raise ValueError( + f"Expected dict data but got {type(data)}" + ) + + # Store current LLM + current_llm = self.llm + + try: + for key, value in data.items(): + if key != "llm": + setattr(self, key, value) + except AttributeError as e: + logger.error( + f"Error setting agent attribute: {str(e)}" + ) + raise + + # Restore LLM + self.llm = current_llm + + logger.info( + f"Successfully loaded agent history from: {file_path}" + ) + + except Exception as e: + logger.error( + f"Unexpected error loading agent history: {str(e)}" + ) + raise + + return None def graceful_shutdown(self): """Gracefully shutdown the system saving the state""" logger.info("Shutting down the system...") - return self.save_state(f"{self.agent_name}.json") + return self.save() def analyze_feedback(self): """Analyze the feedback for issues""" @@ -1351,32 +1346,6 @@ class Agent: logger.info(f"Adding response filter: {filter_word}") self.reponse_filters.append(filter_word) - def code_interpreter_execution( - self, code: str, *args, **kwargs - ) -> str: - # Extract code from markdown - extracted_code = extract_code_from_markdown(code) - - # Execute the code - execution = CodeExecutor().execute(extracted_code) - - # Add the execution to the memory - self.short_memory.add( - role=self.agent_name, - content=execution, - ) - - # Run the llm again - response = self.llm( - self.short_memory.return_history_as_string(), - *args, - **kwargs, - ) - - print(f"Response after code interpretation: {response}") - - return response - def apply_reponse_filters(self, response: str) -> str: """ Apply the response filters to the response @@ -1412,14 +1381,15 @@ class Agent: with open(file_path, "w") as f: yaml.dump(self.to_dict(), f) except Exception as error: - print( + logger.error( colored(f"Error saving agent to YAML: {error}", "red") ) + raise error def get_llm_parameters(self): return str(vars(self.llm)) - def save_state(self, file_path: str, *args, **kwargs) -> None: + def save_state(self, *args, **kwargs) -> None: """ Saves the current state of the agent to a JSON file, including the llm parameters. @@ -1430,45 +1400,11 @@ class Agent: >>> agent.save_state('saved_flow.json') """ try: - logger.info( - f"Saving Agent {self.agent_name} state to: {file_path}" - ) - - json_data = self.to_json() - - create_file_in_folder( - self.workspace_dir, - file_path, - str(json_data), - ) - - # Log the saved state - logger.info(f"Saved agent state to: {file_path}") - except Exception as error: - logger.info(f"Error saving agent state: {error}") - raise error - - def load_state(self, file_path: str): - """ - Loads the state of the agent from a json file and restores the configuration and memory. - - - Example: - >>> agent = Agent(llm=llm_instance, max_loops=5) - >>> agent.load_state('saved_flow.json') - >>> agent.run("Continue with the task") - - """ - try: - with open(file_path, "r") as file: - data = json.load(file) - - for key, value in data.items(): - setattr(self, key, value) - - logger.info(f"Agent state loaded from {file_path}") + logger.info(f"Saving Agent {self.agent_name}") + self.save() + logger.info("Saved agent state") except Exception as error: - logger.info(f"Error loading agent state: {error}") + logger.error(f"Error saving agent state: {error}") raise error def update_system_prompt(self, system_prompt: str): @@ -1551,15 +1487,48 @@ class Agent: raise error def add_tool(self, tool: Callable): + """Add a single tool to the agent's tools list. + + Args: + tool (Callable): The tool function to add + + Returns: + The result of appending the tool to the tools list + """ + logger.info(f"Adding tool: {tool.__name__}") return self.tools.append(tool) def add_tools(self, tools: List[Callable]): + """Add multiple tools to the agent's tools list. + + Args: + tools (List[Callable]): List of tool functions to add + + Returns: + The result of extending the tools list + """ + logger.info(f"Adding tools: {[t.__name__ for t in tools]}") return self.tools.extend(tools) def remove_tool(self, tool: Callable): + """Remove a single tool from the agent's tools list. + + Args: + tool (Callable): The tool function to remove + + Returns: + The result of removing the tool from the tools list + """ + logger.info(f"Removing tool: {tool.__name__}") return self.tools.remove(tool) def remove_tools(self, tools: List[Callable]): + """Remove multiple tools from the agent's tools list. + + Args: + tools (List[Callable]): List of tool functions to remove + """ + logger.info(f"Removing tools: {[t.__name__ for t in tools]}") for tool in tools: self.tools.remove(tool) @@ -1599,7 +1568,7 @@ class Agent: "Could not import agentops, try installing agentops: $ pip3 install agentops" ) - def memory_query(self, task: str = None, *args, **kwargs) -> str: + def memory_query(self, task: str = None, *args, **kwargs) -> None: try: # Query the long term memory if self.long_term_memory is not None: @@ -1608,32 +1577,29 @@ class Agent: task, *args, **kwargs ) + memory_retrieval = ( + f"Documents Available: {str(memory_retrieval)}" + ) + + # Count the tokens memory_token_count = self.tokenizer.count_tokens( memory_retrieval ) - if memory_token_count > self.memory_chunk_size: # Truncate the memory by the memory chunk size memory_retrieval = self.truncate_string_by_tokens( memory_retrieval, self.memory_chunk_size ) - # Merge the task prompt with the memory retrieval - task_prompt = ( - f"{task} Documents Available: {memory_retrieval}" - ) - - response = self.llm(task_prompt, *args, **kwargs) - print(response) - self.short_memory.add( - role=self.agent_name, content=response + role="Database", + content=memory_retrieval, ) - return response + return None except Exception as e: - print(f"An error occurred: {e}") - return None + logger.error(f"An error occurred: {e}") + raise e def sentiment_analysis_handler(self, response: str = None): """ @@ -1829,26 +1795,6 @@ class Agent: else: return input_string - def if_tokens_exceeds_context_length(self): - # Check if tokens exceeds the context length - try: - tokens_used = self.tokenizer.count_tokens( - self.short_memory.return_history_as_string() - ) - if tokens_used > self.context_length: - logger.warning( - "Tokens used exceeds the context length." - ) - logger.info( - f"Tokens available: {tokens_used - self.context_length}" - ) - return True - else: - return False - except Exception as e: - logger.error(f"Error checking tokens: {e}") - return None - def tokens_operations(self, input_string: str) -> str: """ Perform various operations on tokens of an input string. @@ -1936,27 +1882,7 @@ class Agent: "Could not import agentops, try installing agentops: $ pip3 install agentops" ) - async def count_tokens_and_subtract_from_context_window( - self, response: str, *args, **kwargs - ): - """ - Count the number of tokens in the response and subtract it from the context window. - - Args: - response (str): The response to count the tokens from. - - Returns: - str: The response after counting the tokens and subtracting it from the context window. - """ - # Count the number of tokens in the response - tokens = self.tokenizer.count_tokens(response) - - # Subtract the number of tokens from the context window - self.context_length -= len(tokens) - - return response - - def llm_output_parser(self, response): + def llm_output_parser(self, response: Any) -> str: """Parse the output from the LLM""" try: if isinstance(response, dict): @@ -1986,45 +1912,71 @@ class Agent: """Log metadata for each step of agent execution.""" # Generate unique step ID step_id = f"step_{loop}_{uuid.uuid4().hex}" - + # Calculate token usage # full_memory = self.short_memory.return_history_as_string() # prompt_tokens = self.tokenizer.count_tokens(full_memory) # completion_tokens = self.tokenizer.count_tokens(response) # total_tokens = prompt_tokens + completion_tokens - total_tokens=self.tokenizer.count_tokens(task) + self.tokenizer.count_tokens(response), + total_tokens = ( + self.tokenizer.count_tokens(task) + + self.tokenizer.count_tokens(response), + ) - # Get memory responses - memory_responses = { - "short_term": self.short_memory.return_history_as_string() if self.short_memory else None, - "long_term": self.long_term_memory.query(task) if self.long_term_memory else None - } + # # Get memory responses + # memory_responses = { + # "short_term": ( + # self.short_memory.return_history_as_string() + # if self.short_memory + # else None + # ), + # "long_term": ( + # self.long_term_memory.query(task) + # if self.long_term_memory + # else None + # ), + # } + + # # Get tool responses if tool was used + # if self.tools: + # try: + # tool_call_output = parse_and_execute_json( + # self.tools, response, parse_md=True + # ) + # if tool_call_output: + # { + # "tool_name": tool_call_output.get( + # "tool_name", "unknown" + # ), + # "tool_args": tool_call_output.get("args", {}), + # "tool_output": str( + # tool_call_output.get("output", "") + # ), + # } + # except Exception as e: + # logger.debug( + # f"No tool call detected in response: {e}" + # ) - # Get tool responses if tool was used - tool_response = None - if self.tools: - try: - tool_call_output = parse_and_execute_json(self.tools, response, parse_md=True) - if tool_call_output: - tool_response = { - "tool_name": tool_call_output.get("tool_name", "unknown"), - "tool_args": tool_call_output.get("args", {}), - "tool_output": str(tool_call_output.get("output", "")) - } - except Exception as e: - logger.debug(f"No tool call detected in response: {e}") - # Create memory usage tracking - memory_usage = { - "short_term": len(self.short_memory.messages) if self.short_memory else 0, - "long_term": self.long_term_memory.count if self.long_term_memory else 0, - "responses": memory_responses - } - + # memory_usage = { + # "short_term": ( + # len(self.short_memory.messages) + # if self.short_memory + # else 0 + # ), + # "long_term": ( + # self.long_term_memory.count + # if self.long_term_memory + # else 0 + # ), + # "responses": memory_responses, + # } + step_log = Step( step_id=step_id, time=time.time(), - tokens = total_tokens, + tokens=total_tokens, response=AgentChatCompletionResponse( id=self.agent_id, agent_name=self.agent_name, @@ -2042,28 +1994,39 @@ class Agent: # completion_tokens=completion_tokens, # total_tokens=total_tokens, # ), - tool_calls=[] if tool_response is None else [tool_response], - memory_usage=memory_usage + # tool_calls=( + # [] if tool_response is None else [tool_response] + # ), + # memory_usage=None, ), ) - + # Update total tokens if agent_output exists - if hasattr(self, 'agent_output'): - self.agent_output.total_tokens += self.response.total_tokens - - + # if hasattr(self, "agent_output"): + # self.agent_output.total_tokens += ( + # self.response.total_tokens + # ) + # Add step to agent output tracking self.step_pool.append(step_log) - def update_tool_usage(self, step_id: str, tool_name: str, tool_args: dict, tool_response: Any): + def update_tool_usage( + self, + step_id: str, + tool_name: str, + tool_args: dict, + tool_response: Any, + ): """Update tool usage information for a specific step.""" for step in self.agent_output.steps: if step.step_id == step_id: - step.response.tool_calls.append({ - "tool": tool_name, - "arguments": tool_args, - "response": str(tool_response) - }) + step.response.tool_calls.append( + { + "tool": tool_name, + "arguments": tool_args, + "response": str(tool_response), + } + ) break def _serialize_callable( @@ -2158,7 +2121,7 @@ class Agent: create_file_in_folder( self.workspace_dir, f"{self.agent_name}.yaml", - self.to_yaml(), + str(self.to_yaml()), ) return f"Model saved to {self.workspace_dir}/{self.agent_name}.yaml" @@ -2248,25 +2211,7 @@ class Agent: role=self.user_name, content=self.sop ) - def agent_output_type(self, responses: list): - if self.output_type == "list": - return responses - - elif self.output_type == "str" or "string": - return concat_strings(responses) - - elif self.return_step_meta is True: - return self.agent_output.model_dump_json(indent=4) - - elif self.output_type == "yaml": - model = YamlModel() - return model.dict_to_yaml(self.agent_output.model_dump()) - - elif self.output_type == "dict": - return self.agent_output.model_dump() - - elif self.return_history: - return self.short_memory.return_history_as_string() + logger.info("SOP Uploaded into the memory") def run( self, diff --git a/swarms/structs/conversation.py b/swarms/structs/conversation.py index 8e516e9c..768c19c5 100644 --- a/swarms/structs/conversation.py +++ b/swarms/structs/conversation.py @@ -1,11 +1,11 @@ import datetime import json -from typing import Optional +from typing import Any, Optional +import yaml from termcolor import colored from swarms.structs.base_structure import BaseStructure -from typing import Any class Conversation(BaseStructure): @@ -96,10 +96,10 @@ class Conversation(BaseStructure): self.add("System: ", self.system_prompt) if self.rules is not None: - self.add(user, rules) + self.add("User", rules) if custom_rules_prompt is not None: - self.add(user, custom_rules_prompt) + self.add(user or "User", custom_rules_prompt) # If tokenizer then truncate if tokenizer is not None: @@ -245,6 +245,9 @@ class Conversation(BaseStructure): ] ) + def get_str(self): + return self.return_history_as_string() + def save_as_json(self, filename: str = None): """Save the conversation history as a JSON file @@ -379,3 +382,21 @@ class Conversation(BaseStructure): def clear(self): self.conversation_history = [] + + def to_json(self): + return json.dumps(self.conversation_history) + + def to_dict(self): + return self.conversation_history + + def to_yaml(self): + return yaml.dump(self.conversation_history) + + +# # Example usage +# conversation = Conversation() +# conversation.add("user", "Hello, how are you?") +# conversation.add("assistant", "I am doing well, thanks.") +# # print(conversation.to_json()) +# print(type(conversation.to_dict())) +# # print(conversation.to_yaml()) diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index 36b7a81c..eec2288c 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -137,15 +137,18 @@ def run_agents_concurrently_multiprocess( return results + @profile_func -def run_agents_sequentially(agents: List[AgentType], task: str) -> List[Any]: +def run_agents_sequentially( + agents: List[AgentType], task: str +) -> List[Any]: """ Run multiple agents sequentially for baseline comparison. - + Args: agents: List of Agent instances to run task: Task string to execute - + Returns: List of outputs from each agent """ @@ -154,44 +157,52 @@ def run_agents_sequentially(agents: List[AgentType], task: str) -> List[Any]: @profile_func def run_agents_with_different_tasks( - agent_task_pairs: List[tuple[AgentType, str]], + agent_task_pairs: List[tuple[AgentType, str]], batch_size: int = None, - max_workers: int = None + max_workers: int = None, ) -> List[Any]: """ Run multiple agents with different tasks concurrently. - + Args: agent_task_pairs: List of (agent, task) tuples batch_size: Number of agents to run in parallel max_workers: Maximum number of threads - + Returns: List of outputs from each agent """ - async def run_pair_async(pair: tuple[AgentType, str], executor: ThreadPoolExecutor) -> Any: + + async def run_pair_async( + pair: tuple[AgentType, str], executor: ThreadPoolExecutor + ) -> Any: agent, task = pair return await run_agent_async(agent, task, executor) - + cpu_cores = cpu_count() batch_size = batch_size or cpu_cores max_workers = max_workers or cpu_cores * 2 results = [] - + try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - + with ThreadPoolExecutor(max_workers=max_workers) as executor: for i in range(0, len(agent_task_pairs), batch_size): batch = agent_task_pairs[i : i + batch_size] batch_results = loop.run_until_complete( - asyncio.gather(*(run_pair_async(pair, executor) for pair in batch)) + asyncio.gather( + *( + run_pair_async(pair, executor) + for pair in batch + ) + ) ) results.extend(batch_results) - + return results @@ -199,46 +210,46 @@ async def run_agent_with_timeout( agent: AgentType, task: str, timeout: float, - executor: ThreadPoolExecutor + executor: ThreadPoolExecutor, ) -> Any: """ Run an agent with a timeout limit. - + Args: agent: Agent instance to run task: Task string to execute timeout: Timeout in seconds executor: ThreadPoolExecutor instance - + Returns: Agent execution result or None if timeout occurs """ try: return await asyncio.wait_for( - run_agent_async(agent, task, executor), - timeout=timeout + run_agent_async(agent, task, executor), timeout=timeout ) except asyncio.TimeoutError: return None + @profile_func def run_agents_with_timeout( agents: List[AgentType], task: str, timeout: float, batch_size: int = None, - max_workers: int = None + max_workers: int = None, ) -> List[Any]: """ Run multiple agents concurrently with a timeout for each agent. - + Args: agents: List of Agent instances task: Task string to execute timeout: Timeout in seconds for each agent batch_size: Number of agents to run in parallel max_workers: Maximum number of threads - + Returns: List of outputs (None for timed out agents) """ @@ -246,28 +257,29 @@ def run_agents_with_timeout( batch_size = batch_size or cpu_cores max_workers = max_workers or cpu_cores * 2 results = [] - + try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - + with ThreadPoolExecutor(max_workers=max_workers) as executor: for i in range(0, len(agents), batch_size): batch = agents[i : i + batch_size] batch_results = loop.run_until_complete( asyncio.gather( - *(run_agent_with_timeout(agent, task, timeout, executor) - for agent in batch) + *( + run_agent_with_timeout( + agent, task, timeout, executor + ) + for agent in batch + ) ) ) results.extend(batch_results) - - return results - - + return results @dataclass @@ -276,45 +288,52 @@ class ResourceMetrics: memory_percent: float active_threads: int + def get_system_metrics() -> ResourceMetrics: """Get current system resource usage""" return ResourceMetrics( cpu_percent=psutil.cpu_percent(), memory_percent=psutil.virtual_memory().percent, - active_threads=threading.active_count() + active_threads=threading.active_count(), ) + @profile_func def run_agents_with_resource_monitoring( agents: List[AgentType], task: str, cpu_threshold: float = 90.0, memory_threshold: float = 90.0, - check_interval: float = 1.0 + check_interval: float = 1.0, ) -> List[Any]: """ Run agents with system resource monitoring and adaptive batch sizing. - + Args: agents: List of Agent instances task: Task string to execute cpu_threshold: Max CPU usage percentage memory_threshold: Max memory usage percentage check_interval: Resource check interval in seconds - + Returns: List of outputs from each agent """ + async def monitor_resources(): while True: metrics = get_system_metrics() - if metrics.cpu_percent > cpu_threshold or metrics.memory_percent > memory_threshold: + if ( + metrics.cpu_percent > cpu_threshold + or metrics.memory_percent > memory_threshold + ): # Reduce batch size or pause execution pass await asyncio.sleep(check_interval) - + # Implementation details... - + + # # Example usage: # # Initialize your agents with the same model to avoid re-creating it # agents = [ @@ -341,4 +360,3 @@ def run_agents_with_resource_monitoring( # for i, output in enumerate(outputs): # print(f"Output from agent {i+1}:\n{output}") - diff --git a/swarms/tools/tool_parse_exec.py b/swarms/tools/tool_parse_exec.py index 15f04d07..8686781a 100644 --- a/swarms/tools/tool_parse_exec.py +++ b/swarms/tools/tool_parse_exec.py @@ -1,15 +1,17 @@ import json -from typing import List +from typing import List, Any, Callable from swarms.utils.loguru_logger import logger from swarms.utils.parse_code import extract_code_from_markdown def parse_and_execute_json( - functions: List[callable] = None, - json_string: str = None, + functions: List[Callable[..., Any]], + json_string: str, parse_md: bool = False, -): + verbose: bool = False, + return_str: bool = True, +) -> dict: """ Parses and executes a JSON string containing function names and parameters. @@ -17,191 +19,104 @@ def parse_and_execute_json( functions (List[callable]): A list of callable functions. json_string (str): The JSON string to parse and execute. parse_md (bool): Flag indicating whether to extract code from Markdown. - + verbose (bool): Flag indicating whether to enable verbose logging. + return_str (bool): Flag indicating whether to return a JSON string. Returns: - A dictionary containing the results of executing the functions with the parsed parameters. - + dict: A dictionary containing the results of executing the functions with the parsed parameters. """ + if not functions or not json_string: + raise ValueError("Functions and JSON string are required") + if parse_md: json_string = extract_code_from_markdown(json_string) try: - # Create a dictionary that maps function names to functions + # Create function name to function mapping function_dict = {func.__name__: func for func in functions} + if verbose: + logger.info( + f"Available functions: {list(function_dict.keys())}" + ) + logger.info(f"Processing JSON: {json_string}") + + # Parse JSON data data = json.loads(json_string) - function_list = ( - data.get("functions", []) - if data.get("functions") - else [data.get("function", [])] - ) + + # Handle both single function and function list formats + function_list = [] + if "functions" in data: + function_list = data["functions"] + elif "function" in data: + function_list = [data["function"]] + else: + function_list = [ + data + ] # Assume entire object is single function + + # Ensure function_list is a list and filter None values + if isinstance(function_list, dict): + function_list = [function_list] + function_list = [f for f in function_list if f] + + if verbose: + logger.info(f"Processing {len(function_list)} functions") results = {} for function_data in function_list: function_name = function_data.get("name") - parameters = function_data.get("parameters") + parameters = function_data.get("parameters", {}) - # Check if the function name is in the function dictionary - if function_name in function_dict: - # Call the function with the parsed parameters - result = function_dict[function_name](**parameters) - results[function_name] = str(result) - else: + if not function_name: + logger.warning("Function data missing name field") + continue + + if verbose: + logger.info( + f"Executing {function_name} with params: {parameters}" + ) + + if function_name not in function_dict: + logger.warning(f"Function {function_name} not found") results[function_name] = None + continue - return results + try: + result = function_dict[function_name](**parameters) + results[function_name] = str(result) + if verbose: + logger.info( + f"Result for {function_name}: {result}" + ) + except Exception as e: + logger.error( + f"Error executing {function_name}: {str(e)}" + ) + results[function_name] = f"Error: {str(e)}" + + # Format final results + if len(results) == 1: + # Return single result directly + data = {"result": next(iter(results.values()))} + else: + # Return all results + data = { + "results": results, + "summary": "\n".join( + f"{k}: {v}" for k, v in results.items() + ), + } + + if return_str: + return json.dumps(data) + else: + return data + + except json.JSONDecodeError as e: + error = f"Invalid JSON format: {str(e)}" + logger.error(error) + return {"error": error} except Exception as e: - logger.error(f"Error parsing and executing JSON: {e}") - return None - - -# def parse_and_execute_json( -# functions: List[Callable[..., Any]], -# json_string: str = None, -# parse_md: bool = False, -# verbose: bool = False, -# ) -> Dict[str, Any]: -# """ -# Parses and executes a JSON string containing function names and parameters. - -# Args: -# functions (List[Callable]): A list of callable functions. -# json_string (str): The JSON string to parse and execute. -# parse_md (bool): Flag indicating whether to extract code from Markdown. -# verbose (bool): Flag indicating whether to enable verbose logging. - -# Returns: -# Dict[str, Any]: A dictionary containing the results of executing the functions with the parsed parameters. -# """ -# if parse_md: -# json_string = extract_code_from_markdown(json_string) - -# logger.info("Number of functions: " + str(len(functions))) - -# try: -# # Create a dictionary that maps function names to functions -# function_dict = {func.__name__: func for func in functions} - -# data = json.loads(json_string) -# function_list = data.get("functions") or [data.get("function")] - -# # Ensure function_list is a list and filter out None values -# if isinstance(function_list, dict): -# function_list = [function_list] -# else: -# function_list = [f for f in function_list if f] - -# results = {} - -# # Determine if concurrency is needed -# concurrency = len(function_list) > 1 - -# if concurrency: -# with concurrent.futures.ThreadPoolExecutor() as executor: -# future_to_function = { -# executor.submit( -# execute_and_log_function, -# function_dict, -# function_data, -# verbose, -# ): function_data -# for function_data in function_list -# } -# for future in concurrent.futures.as_completed( -# future_to_function -# ): -# function_data = future_to_function[future] -# try: -# result = future.result() -# results.update(result) -# except Exception as e: -# if verbose: -# logger.error( -# f"Error executing function {function_data.get('name')}: {e}" -# ) -# results[function_data.get("name")] = None -# else: -# for function_data in function_list: -# function_name = function_data.get("name") -# parameters = function_data.get("parameters") - -# if verbose: -# logger.info( -# f"Executing function: {function_name} with parameters: {parameters}" -# ) - -# if function_name in function_dict: -# try: -# result = function_dict[function_name](**parameters) -# results[function_name] = str(result) -# if verbose: -# logger.info( -# f"Result for function {function_name}: {result}" -# ) -# except Exception as e: -# if verbose: -# logger.error( -# f"Error executing function {function_name}: {e}" -# ) -# results[function_name] = None -# else: -# if verbose: -# logger.warning( -# f"Function {function_name} not found." -# ) -# results[function_name] = None - -# # Merge all results into a single string -# merged_results = "\n".join( -# f"{key}: {value}" for key, value in results.items() -# ) - -# return {"merged_results": merged_results} -# except Exception as e: -# logger.error(f"Error parsing and executing JSON: {e}") -# return None - - -# def execute_and_log_function( -# function_dict: Dict[str, Callable], -# function_data: Dict[str, Any], -# verbose: bool, -# ) -> Dict[str, Any]: -# """ -# Executes a function from a given dictionary of functions and logs the execution details. - -# Args: -# function_dict (Dict[str, Callable]): A dictionary containing the available functions. -# function_data (Dict[str, Any]): A dictionary containing the function name and parameters. -# verbose (bool): A flag indicating whether to log the execution details. - -# Returns: -# Dict[str, Any]: A dictionary containing the function name and its result. - -# """ -# function_name = function_data.get("name") -# parameters = function_data.get("parameters") - -# if verbose: -# logger.info( -# f"Executing function: {function_name} with parameters: {parameters}" -# ) - -# if function_name in function_dict: -# try: -# result = function_dict[function_name](**parameters) -# if verbose: -# logger.info( -# f"Result for function {function_name}: {result}" -# ) -# return {function_name: str(result)} -# except Exception as e: -# if verbose: -# logger.error( -# f"Error executing function {function_name}: {e}" -# ) -# return {function_name: None} -# else: -# if verbose: -# logger.warning(f"Function {function_name} not found.") -# return {function_name: None} + error = f"Error parsing and executing JSON: {str(e)}" + logger.error(error) + return {"error": error} diff --git a/tests/agents/test_agent_logging.py b/tests/agents/test_agent_logging.py index b2106dd6..1439935e 100644 --- a/tests/agents/test_agent_logging.py +++ b/tests/agents/test_agent_logging.py @@ -1,9 +1,5 @@ -from unittest.mock import Mock, MagicMock -from dataclasses import dataclass, field, asdict -from typing import List, Dict, Any -from datetime import datetime +from unittest.mock import MagicMock import unittest -from swarms.schemas.agent_step_schemas import ManySteps, Step from swarms.structs.agent import Agent from swarms.tools.tool_parse_exec import parse_and_execute_json @@ -12,61 +8,80 @@ parse_and_execute_json = MagicMock() parse_and_execute_json.return_value = { "tool_name": "calculator", "args": {"numbers": [2, 2]}, - "output": "4" + "output": "4", } + class TestAgentLogging(unittest.TestCase): def setUp(self): self.mock_tokenizer = MagicMock() self.mock_tokenizer.count_tokens.return_value = 100 - + self.mock_short_memory = MagicMock() - self.mock_short_memory.get_memory_stats.return_value = {"message_count": 2} - + self.mock_short_memory.get_memory_stats.return_value = { + "message_count": 2 + } + self.mock_long_memory = MagicMock() - self.mock_long_memory.get_memory_stats.return_value = {"item_count": 5} - + self.mock_long_memory.get_memory_stats.return_value = { + "item_count": 5 + } + self.agent = Agent( tokenizer=self.mock_tokenizer, short_memory=self.mock_short_memory, - long_term_memory=self.mock_long_memory + long_term_memory=self.mock_long_memory, ) def test_log_step_metadata_basic(self): - log_result = self.agent.log_step_metadata(1, "Test prompt", "Test response") - - self.assertIn('step_id', log_result) - self.assertIn('timestamp', log_result) - self.assertIn('tokens', log_result) - self.assertIn('memory_usage', log_result) - - self.assertEqual(log_result['tokens']['total'], 200) + log_result = self.agent.log_step_metadata( + 1, "Test prompt", "Test response" + ) + + self.assertIn("step_id", log_result) + self.assertIn("timestamp", log_result) + self.assertIn("tokens", log_result) + self.assertIn("memory_usage", log_result) + + self.assertEqual(log_result["tokens"]["total"], 200) def test_log_step_metadata_no_long_term_memory(self): self.agent.long_term_memory = None - log_result = self.agent.log_step_metadata(1, "prompt", "response") - self.assertEqual(log_result['memory_usage']['long_term'], {}) - + log_result = self.agent.log_step_metadata( + 1, "prompt", "response" + ) + self.assertEqual(log_result["memory_usage"]["long_term"], {}) + def test_log_step_metadata_timestamp(self): - log_result = self.agent.log_step_metadata(1, "prompt", "response") - self.assertIn('timestamp', log_result) + log_result = self.agent.log_step_metadata( + 1, "prompt", "response" + ) + self.assertIn("timestamp", log_result) def test_token_counting_integration(self): self.mock_tokenizer.count_tokens.side_effect = [150, 250] - log_result = self.agent.log_step_metadata(1, "prompt", "response") - - self.assertEqual(log_result['tokens']['total'], 400) + log_result = self.agent.log_step_metadata( + 1, "prompt", "response" + ) + + self.assertEqual(log_result["tokens"]["total"], 400) def test_agent_output_updating(self): - initial_total_tokens = sum(step['tokens']['total'] for step in self.agent.agent_output.steps) - self.agent.log_step_metadata(1, "prompt", "response") - - final_total_tokens = sum(step['tokens']['total'] for step in self.agent.agent_output.steps) - self.assertEqual( - final_total_tokens - initial_total_tokens, - 200 - ) - self.assertEqual(len(self.agent.agent_output.steps), 1) + initial_total_tokens = sum( + step["tokens"]["total"] + for step in self.agent.agent_output.steps + ) + self.agent.log_step_metadata(1, "prompt", "response") + + final_total_tokens = sum( + step["tokens"]["total"] + for step in self.agent.agent_output.steps + ) + self.assertEqual( + final_total_tokens - initial_total_tokens, 200 + ) + self.assertEqual(len(self.agent.agent_output.steps), 1) + class TestAgentLoggingIntegration(unittest.TestCase): def setUp(self): @@ -75,24 +90,25 @@ class TestAgentLoggingIntegration(unittest.TestCase): def test_full_logging_cycle(self): task = "Test task" max_loops = 1 - + result = self.agent._run(task, max_loops=max_loops) - + self.assertIsInstance(result, dict) - self.assertIn('steps', result) - self.assertIsInstance(result['steps'], list) - self.assertEqual(len(result['steps']), max_loops) - - if result['steps']: - step = result['steps'][0] - self.assertIn('step_id', step) - self.assertIn('timestamp', step) - self.assertIn('task', step) - self.assertIn('response', step) - self.assertEqual(step['task'], task) - self.assertEqual(step['response'], f"Response for loop 1") - + self.assertIn("steps", result) + self.assertIsInstance(result["steps"], list) + self.assertEqual(len(result["steps"]), max_loops) + + if result["steps"]: + step = result["steps"][0] + self.assertIn("step_id", step) + self.assertIn("timestamp", step) + self.assertIn("task", step) + self.assertIn("response", step) + self.assertEqual(step["task"], task) + self.assertEqual(step["response"], "Response for loop 1") + self.assertTrue(len(self.agent.agent_output.steps) > 0) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main() diff --git a/tests/structs/test_agent.py b/tests/structs/test_agent.py index 4a145029..1661e354 100644 --- a/tests/structs/test_agent.py +++ b/tests/structs/test_agent.py @@ -541,7 +541,7 @@ def test_flow_load_state(flow_instance): "max_loops": 10, "autosave_path": "/path/to/load", } - flow_instance.load_state(state) + flow_instance.load(state) assert flow_instance.get_current_prompt() == "Loaded prompt" assert "Step 1" in flow_instance.get_instructions() assert "User message 1" in flow_instance.get_user_messages()