diff --git a/api/agent_api_test.py b/api/agent_api_test.py index 10addd36..c8420227 100644 --- a/api/agent_api_test.py +++ b/api/agent_api_test.py @@ -17,6 +17,10 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) +from typing import Dict, Optional, Tuple +from uuid import UUID + +BASE_URL = "http://0.0.0.0:8000/v1" # Configuration @dataclass @@ -132,6 +136,191 @@ class TestRunner: logger.info(f"\nRunning test: {test_name}") start_time = time.time() +<<<<<<< HEAD +======= + +def create_test_user(session: TestSession) -> Tuple[bool, str]: + """Create a test user and store credentials in session.""" + logger.info("Creating test user") + + try: + response = requests.post( + f"{BASE_URL}/users", + json={"username": f"test_user_{int(time.time())}"}, + ) + + if response.status_code == 200: + data = response.json() + session.user_id = data["user_id"] + session.api_key = data["api_key"] + logger.success(f"Created user with ID: {session.user_id}") + return True, "Success" + else: + logger.error(f"Failed to create user: {response.text}") + return False, response.text + except Exception as e: + logger.exception("Exception during user creation") + return False, str(e) + + +def create_additional_api_key( + session: TestSession, +) -> Tuple[bool, str]: + """Test creating an additional API key.""" + logger.info("Creating additional API key") + + try: + response = requests.post( + f"{BASE_URL}/users/{session.user_id}/api-keys", + headers=session.headers, + json={"name": "Test Key"}, + ) + + if response.status_code == 200: + logger.success("Created additional API key") + return True, response.json()["key"] + else: + logger.error(f"Failed to create API key: {response.text}") + return False, response.text + except Exception as e: + logger.exception("Exception during API key creation") + return False, str(e) + + +def test_create_agent( + session: TestSession, +) -> Tuple[bool, Optional[UUID]]: + """Test creating a new agent.""" + logger.info("Testing agent creation") + + payload = { + "agent_name": f"Test Agent {int(time.time())}", + "system_prompt": "You are a helpful assistant", + "model_name": "gpt-4", + "description": "Test agent", + "tags": ["test", "automated"], + } + + try: + response = requests.post( + f"{BASE_URL}/agent", headers=session.headers, json=payload + ) + + if response.status_code == 200: + agent_id = response.json()["agent_id"] + session.test_agents.append(agent_id) + logger.success(f"Created agent with ID: {agent_id}") + return True, agent_id + else: + logger.error(f"Failed to create agent: {response.text}") + return False, None + except Exception: + logger.exception("Exception during agent creation") + return False, None + + +def test_list_user_agents(session: TestSession) -> bool: + """Test listing user's agents.""" + logger.info("Testing user agent listing") + + try: + response = requests.get( + f"{BASE_URL}/users/me/agents", headers=session.headers + ) + + if response.status_code == 200: + agents = response.json() + logger.success(f"Found {len(agents)} user agents") + return True + else: + logger.error( + f"Failed to list user agents: {response.text}" + ) + return False + except Exception: + logger.exception("Exception during agent listing") + return False + + +def test_agent_operations( + session: TestSession, agent_id: UUID +) -> bool: + """Test various operations on an agent.""" + logger.info(f"Testing operations for agent {agent_id}") + + # Test update + try: + update_response = requests.patch( + f"{BASE_URL}/agent/{agent_id}", + headers=session.headers, + json={ + "description": "Updated description", + "tags": ["test", "updated"], + }, + ) + if update_response.status_code != 200: + logger.error( + f"Failed to update agent: {update_response.text}" + ) + return False + + # Test metrics + metrics_response = requests.get( + f"{BASE_URL}/agent/{agent_id}/metrics", + headers=session.headers, + ) + if metrics_response.status_code != 200: + logger.error( + f"Failed to get agent metrics: {metrics_response.text}" + ) + return False + + logger.success("Successfully performed agent operations") + return True + except Exception: + logger.exception("Exception during agent operations") + return False + + +def test_completion(session: TestSession, agent_id: UUID) -> bool: + """Test running a completion.""" + logger.info("Testing completion") + + payload = { + "prompt": "What is the weather like today?", + "agent_id": agent_id, + "max_tokens": 100, + } + + try: + response = requests.post( + f"{BASE_URL}/agent/completions", + headers=session.headers, + json=payload, + ) + + if response.status_code == 200: + completion_data = response.json() + print(completion_data) + logger.success( + f"Got completion, used {completion_data['token_usage']['total_tokens']} tokens" + ) + return True + else: + logger.error(f"Failed to get completion: {response.text}") + return False + except Exception: + logger.exception("Exception during completion") + return False + + +def cleanup_test_resources(session: TestSession): + """Clean up all test resources.""" + logger.info("Cleaning up test resources") + + # Delete test agents + for agent_id in session.test_agents: +>>>>>>> 68728698 ([AGENT][LiteLLM FIX] [API FIX]) try: test_func() self.results["passed"] += 1 @@ -281,6 +470,7 @@ Total Time: {self.results['total_time']:.2f}s if __name__ == "__main__": +<<<<<<< HEAD try: runner = TestRunner() runner.run_all_tests() @@ -289,3 +479,7 @@ if __name__ == "__main__": except Exception as e: logger.error(f"Test suite failed: {str(e)}") logger.exception(e) +======= + success = run_test_workflow() + print(success) +>>>>>>> 68728698 ([AGENT][LiteLLM FIX] [API FIX]) diff --git a/example.py b/example.py index 76c23353..96670e1b 100644 --- a/example.py +++ b/example.py @@ -9,20 +9,21 @@ agent = Agent( agent_description="Personal finance advisor agent", system_prompt=FINANCIAL_AGENT_SYS_PROMPT + "Output the token when you're done creating a portfolio of etfs, index, funds, and more for AI", - model_name="gpt-4o", # Use any model from litellm - max_loops="auto", + max_loops=1, + model_name="gpt-4o", dynamic_temperature_enabled=True, user_name="Kye", retry_attempts=3, - streaming_on=True, - context_length=16000, + # streaming_on=True, + context_length=8192, return_step_meta=False, output_type="str", # "json", "dict", "csv" OR "string" "yaml" and auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task - max_tokens=16000, # max output tokens - interactive=True, + max_tokens=4000, # max output tokens + # interactive=True, stopping_token="", - execute_tool=True, + saved_state_path="agent_00.json", + interactive=False, ) agent.run( diff --git a/api/main.py b/main.py similarity index 100% rename from api/main.py rename to main.py diff --git a/swarm_builder.py b/swarm_builder.py deleted file mode 100644 index f1d769b4..00000000 --- a/swarm_builder.py +++ /dev/null @@ -1,333 +0,0 @@ -import os -from typing import List, Optional -from datetime import datetime - -from pydantic import BaseModel, Field -from pydantic.v1 import validator -from loguru import logger -from tenacity import ( - retry, - stop_after_attempt, - wait_exponential, -) - -from swarm_models import OpenAIFunctionCaller, OpenAIChat -from swarms.structs.agent import Agent -from swarms.structs.swarm_router import SwarmRouter -from swarms.structs.agents_available import showcase_available_agents - - -BOSS_SYSTEM_PROMPT = """ -Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective. - -### Instructions: - -1. **Task Assignment**: - - Analyze available worker agents when a task is presented. - - Delegate tasks to existing agents with clear, direct, and actionable instructions if an appropriate agent is available. - - If no suitable agent exists, create a new agent with a fitting system prompt to handle the task. - -2. **Agent Creation**: - - Name agents according to the task they are intended to perform (e.g., "Twitter Marketing Agent"). - - Provide each new agent with a concise and clear system prompt that includes its role, objectives, and any tools it can utilize. - -3. **Efficiency**: - - Minimize redundancy and maximize task completion speed. - - Avoid unnecessary agent creation if an existing agent can fulfill the task. - -4. **Communication**: - - Be explicit in task delegation instructions to avoid ambiguity and ensure effective task execution. - - Require agents to report back on task completion or encountered issues. - -5. **Reasoning and Decisions**: - - Offer brief reasoning when selecting or creating agents to maintain transparency. - - Avoid using an agent if unnecessary, with a clear explanation if no agents are suitable for a task. - -# Output Format - -Present your plan in clear, bullet-point format or short concise paragraphs, outlining task assignment, agent creation, efficiency strategies, and communication protocols. - -# Notes - -- Preserve transparency by always providing reasoning for task-agent assignments and creation. -- Ensure instructions to agents are unambiguous to minimize error. - -""" - - -class AgentConfig(BaseModel): - """Configuration for an individual agent in a swarm""" - - name: str = Field( - description="The name of the agent", example="Research-Agent" - ) - description: str = Field( - description="A description of the agent's purpose and capabilities", - example="Agent responsible for researching and gathering information", - ) - system_prompt: str = Field( - description="The system prompt that defines the agent's behavior", - example="You are a research agent. Your role is to gather and analyze information...", - ) - - @validator("name") - def validate_name(cls, v): - if not v.strip(): - raise ValueError("Agent name cannot be empty") - return v.strip() - - @validator("system_prompt") - def validate_system_prompt(cls, v): - if not v.strip(): - raise ValueError("System prompt cannot be empty") - return v.strip() - - -class SwarmConfig(BaseModel): - """Configuration for a swarm of cooperative agents""" - - name: str = Field( - description="The name of the swarm", - example="Research-Writing-Swarm", - ) - description: str = Field( - description="The description of the swarm's purpose and capabilities", - example="A swarm of agents that work together to research topics and write articles", - ) - agents: List[AgentConfig] = Field( - description="The list of agents that make up the swarm", - min_items=1, - ) - - @validator("agents") - def validate_agents(cls, v): - if not v: - raise ValueError("Swarm must have at least one agent") - return v - - -class AutoSwarmBuilder: - """A class that automatically builds and manages swarms of AI agents with enhanced error handling.""" - - def __init__( - self, - name: Optional[str] = None, - description: Optional[str] = None, - verbose: bool = True, - api_key: Optional[str] = None, - model_name: str = "gpt-4", - ): - self.name = name or "DefaultSwarm" - self.description = description or "Generic AI Agent Swarm" - self.verbose = verbose - self.agents_pool = [] - self.api_key = api_key or os.getenv("OPENAI_API_KEY") - self.model_name = model_name - - if not self.api_key: - raise ValueError( - "OpenAI API key must be provided either through initialization or environment variable" - ) - - logger.info( - "Initialized AutoSwarmBuilder", - extra={ - "swarm_name": self.name, - "description": self.description, - "model": self.model_name, - }, - ) - - # Initialize OpenAI chat model - try: - self.chat_model = OpenAIChat( - openai_api_key=self.api_key, - model_name=self.model_name, - temperature=0.1, - ) - except Exception as e: - logger.error( - f"Failed to initialize OpenAI chat model: {str(e)}" - ) - raise - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), - ) - def run(self, task: str, image_url: Optional[str] = None) -> str: - """Run the swarm on a given task with error handling and retries.""" - if not task or not task.strip(): - raise ValueError("Task cannot be empty") - - logger.info("Starting swarm execution", extra={"task": task}) - - try: - # Create agents for the task - agents = self._create_agents(task, image_url) - if not agents: - raise ValueError( - "No agents were created for the task" - ) - - # Execute the task through the swarm router - logger.info( - "Routing task through swarm", - extra={"num_agents": len(agents)}, - ) - output = self.swarm_router(agents, task, image_url) - - logger.info("Swarm execution completed successfully") - return output - - except Exception as e: - logger.error( - f"Error during swarm execution: {str(e)}", - exc_info=True, - ) - raise - - def _create_agents( - self, task: str, image_url: Optional[str] = None - ) -> List[Agent]: - """Create the necessary agents for a task with enhanced error handling.""" - logger.info("Creating agents for task", extra={"task": task}) - - try: - model = OpenAIFunctionCaller( - system_prompt=BOSS_SYSTEM_PROMPT, - api_key=self.api_key, - temperature=0.1, - base_model=SwarmConfig, - ) - - agents_config = model.run(task) - print(f"{agents_config}") - - if isinstance(agents_config, dict): - agents_config = SwarmConfig(**agents_config) - - # Update swarm configuration - self.name = agents_config.name - self.description = agents_config.description - - # Create agents from configuration - agents = [] - for agent_config in agents_config.agents: - if isinstance(agent_config, dict): - agent_config = AgentConfig(**agent_config) - - agent = self.build_agent( - agent_name=agent_config.name, - agent_description=agent_config.description, - agent_system_prompt=agent_config.system_prompt, - ) - agents.append(agent) - - # Add available agents showcase to system prompts - agents_available = showcase_available_agents( - name=self.name, - description=self.description, - agents=agents, - ) - - for agent in agents: - agent.system_prompt += "\n" + agents_available - - logger.info( - "Successfully created agents", - extra={"num_agents": len(agents)}, - ) - return agents - - except Exception as e: - logger.error( - f"Error creating agents: {str(e)}", exc_info=True - ) - raise - - def build_agent( - self, - agent_name: str, - agent_description: str, - agent_system_prompt: str, - ) -> Agent: - """Build a single agent with enhanced error handling.""" - logger.info( - "Building agent", extra={"agent_name": agent_name} - ) - - try: - agent = Agent( - agent_name=agent_name, - description=agent_description, - system_prompt=agent_system_prompt, - llm=self.chat_model, - autosave=True, - dashboard=False, - verbose=self.verbose, - dynamic_temperature_enabled=True, - saved_state_path=f"states/{agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", - user_name="swarms_corp", - retry_attempts=3, - context_length=200000, - return_step_meta=False, - output_type="str", - streaming_on=False, - auto_generate_prompt=True, - ) - return agent - - except Exception as e: - logger.error( - f"Error building agent: {str(e)}", exc_info=True - ) - raise - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), - ) - def swarm_router( - self, - agents: List[Agent], - task: str, - image_url: Optional[str] = None, - ) -> str: - """Route tasks between agents in the swarm with error handling and retries.""" - logger.info( - "Initializing swarm router", - extra={"num_agents": len(agents)}, - ) - - try: - swarm_router_instance = SwarmRouter( - name=self.name, - description=self.description, - agents=agents, - swarm_type="auto", - ) - - formatted_task = f"{self.name} {self.description} {task}" - result = swarm_router_instance.run(formatted_task) - - logger.info("Successfully completed swarm routing") - return result - - except Exception as e: - logger.error( - f"Error in swarm router: {str(e)}", exc_info=True - ) - raise - - -swarm = AutoSwarmBuilder( - name="ChipDesign-Swarm", - description="A swarm of specialized AI agents for chip design", - api_key="your-api-key", # Optional if set in environment - model_name="gpt-4", # Optional, defaults to gpt-4 -) - -result = swarm.run( - "Design a new AI accelerator chip optimized for transformer model inference..." -) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index b9df9157..2f1f380f 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1,14 +1,13 @@ -from datetime import datetime import asyncio import json import logging import os import random -import sys import threading import time import uuid from concurrent.futures import ThreadPoolExecutor +from datetime import datetime from typing import ( Any, Callable, @@ -22,9 +21,12 @@ from typing import ( import toml import yaml +from loguru import logger from pydantic import BaseModel from swarm_models.tiktoken_wrapper import TikTokenizer + from swarms.agents.ape_agent import auto_generate_prompt +from swarms.artifacts.main_artifact import Artifact from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3 from swarms.prompts.multi_modal_autonomous_instruction_prompt import ( MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1, @@ -38,22 +40,19 @@ from swarms.schemas.base_schemas import ( ) from swarms.structs.concat import concat_strings from swarms.structs.conversation import Conversation -from swarms.tools.base_tool import BaseTool -from swarms.tools.func_calling_utils import ( - prepare_output_for_output_model, +from swarms.structs.safe_loading import ( + SafeLoaderUtils, + SafeStateManager, ) +from swarms.tools.base_tool import BaseTool from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.utils.data_to_text import data_to_text from swarms.utils.file_processing import create_file_in_folder +from swarms.utils.formatter import formatter from swarms.utils.pdf_to_text import pdf_to_text -from swarms.artifacts.main_artifact import Artifact -from swarms.utils.loguru_logger import initialize_logger from swarms.utils.wrapper_clusterop import ( exec_callable_with_clusterops, ) -from swarms.utils.formatter import formatter - -logger = initialize_logger(log_folder="agents") # Utils @@ -136,7 +135,6 @@ class Agent: callback (Callable): The callback function metadata (Dict[str, Any]): The metadata callbacks (List[Callable]): The list of callback functions - logger_handler (Any): The logger handler search_algorithm (Callable): The search algorithm logs_to_filename (str): The filename for the logs evaluator (Callable): The evaluator function @@ -271,7 +269,6 @@ class Agent: callback: Optional[Callable] = None, metadata: Optional[Dict[str, Any]] = None, callbacks: Optional[List[Callable]] = None, - logger_handler: Optional[Any] = sys.stderr, search_algorithm: Optional[Callable] = None, logs_to_filename: Optional[str] = None, evaluator: Optional[Callable] = None, # Custom LLM or agent @@ -297,7 +294,6 @@ class Agent: algorithm_of_thoughts: bool = False, tree_of_thoughts: bool = False, tool_choice: str = "auto", - execute_tool: bool = False, rules: str = None, # type: ignore planning: Optional[str] = False, planning_prompt: Optional[str] = None, @@ -319,7 +315,7 @@ class Agent: use_cases: Optional[List[Dict[str, str]]] = None, step_pool: List[Step] = [], print_every_step: Optional[bool] = False, - time_created: Optional[float] = time.strftime( + time_created: Optional[str] = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime() ), agent_output: ManySteps = None, @@ -340,6 +336,7 @@ class Agent: all_gpus: bool = False, model_name: str = None, llm_args: dict = None, + load_state_path: str = None, *args, **kwargs, ): @@ -390,7 +387,6 @@ class Agent: self.callback = callback self.metadata = metadata self.callbacks = callbacks - self.logger_handler = logger_handler self.search_algorithm = search_algorithm self.logs_to_filename = logs_to_filename self.evaluator = evaluator @@ -414,7 +410,6 @@ class Agent: self.algorithm_of_thoughts = algorithm_of_thoughts self.tree_of_thoughts = tree_of_thoughts self.tool_choice = tool_choice - self.execute_tool = execute_tool self.planning = planning self.planning_prompt = planning_prompt self.custom_planning_prompt = custom_planning_prompt @@ -457,6 +452,7 @@ class Agent: self.all_gpus = all_gpus self.model_name = model_name self.llm_args = llm_args + self.load_state_path = load_state_path # Initialize the short term memory self.short_memory = Conversation( @@ -502,10 +498,6 @@ class Agent: if preset_stopping_token is not None: self.stopping_token = "" - # # Check the parameters - # # Telemetry Processor to log agent data - # threading.Thread(target=self.agent_initialization()).start - # If the docs exist then ingest the docs if exists(self.docs): threading.Thread( @@ -546,19 +538,6 @@ class Agent: tool.__name__: tool for tool in tools } - # Set the logger handler - if exists(logger_handler): - log_file_path = os.path.join( - self.workspace_dir, f"{self.agent_name}.log" - ) - logger.add( - log_file_path, - level="INFO", - colorize=True, - backtrace=True, - diagnose=True, - ) - # If the tool schema exists or a list of base models exists then convert the tool schema into an openai schema if exists(tool_schema) or exists(list_base_models): threading.Thread( @@ -593,20 +572,23 @@ class Agent: # Telemetry Processor to log agent data threading.Thread(target=self.log_agent_data).start() - threading.Thread(target=self.llm_handling()) + if self.llm is None and self.model_name is not None: + self.llm = self.llm_handling() def llm_handling(self): + from swarms.utils.litellm_wrapper import LiteLLM - if self.llm is None: - from swarms.utils.litellm_wrapper import LiteLLM + if self.llm_args is not None: + llm = LiteLLM(model_name=self.model_name, **self.llm_args) - if self.llm_args is not None: - self.llm = LiteLLM( - model_name=self.model_name, **self.llm_args - ) + else: + llm = LiteLLM( + model_name=self.model_name, + temperature=self.temperature, + max_tokens=self.max_tokens, + ) - else: - self.llm = LiteLLM(model_name=self.model_name) + return llm def check_if_no_prompt_then_autogenerate(self, task: str = None): """ @@ -820,6 +802,9 @@ class Agent: # Print the user's request + if self.autosave: + self.save() + # Print the request if print_task is True: formatter.print_panel( @@ -904,13 +889,6 @@ class Agent: # Check and execute tools if self.tools is not None: self.parse_and_execute_tools(response) - # if tool_result: - # self.update_tool_usage( - # step_meta["step_id"], - # tool_result["tool"], - # tool_result["args"], - # tool_result["response"], - # ) # Add the response to the memory self.short_memory.add( @@ -944,6 +922,12 @@ class Agent: success = True # Mark as successful to exit the retry loop except Exception as e: + + self.log_agent_data() + + if self.autosave is True: + self.save() + logger.error( f"Attempt {attempt+1}: Error generating" f" response: {e}" @@ -951,6 +935,12 @@ class Agent: attempt += 1 if not success: + + self.log_agent_data() + + if self.autosave is True: + self.save() + logger.error( "Failed to generate a valid response after" " retry attempts." @@ -994,8 +984,10 @@ class Agent: time.sleep(self.loop_interval) if self.autosave is True: - logger.info("Autosaving agent state.") - self.save_state() + self.log_agent_data() + + if self.autosave is True: + self.save() # Apply the cleaner function to the response if self.output_cleaner is not None: @@ -1037,10 +1029,9 @@ class Agent: self.artifacts_file_extension, ) - try: - self.log_agent_data() - except Exception: - pass + self.log_agent_data() + if self.autosave is True: + self.save() # More flexible output types if ( @@ -1050,7 +1041,10 @@ class Agent: return concat_strings(all_responses) elif self.output_type == "list": return all_responses - elif self.output_type == "json": + elif ( + self.output_type == "json" + or self.return_step_meta is True + ): return self.agent_output.model_dump_json(indent=4) elif self.output_type == "csv": return self.dict_to_csv( @@ -1062,8 +1056,6 @@ class Agent: return yaml.safe_dump( self.agent_output.model_dump(), sort_keys=False ) - elif self.return_step_meta is True: - return self.agent_output.model_dump_json(indent=4) elif self.return_history is True: history = self.short_memory.get_str() @@ -1077,18 +1069,74 @@ class Agent: ) except Exception as error: - self.log_agent_data() - logger.info( - f"Error running agent: {error} optimize your input parameters" - ) - raise error + self._handle_run_error(error) except KeyboardInterrupt as error: - self.log_agent_data() - logger.info( - f"Error running agent: {error} optimize your input parameters" + self._handle_run_error(error) + + def _handle_run_error(self, error: any): + self.log_agent_data() + + if self.autosave is True: + self.save() + + logger.info( + f"Error detected running your agent {self.agent_name} \n Error {error} \n Optimize your input parameters and or add an issue on the swarms github and contact our team on discord for support ;) " + ) + raise error + + async def arun( + self, + task: Optional[str] = None, + img: Optional[str] = None, + is_last: bool = False, + device: str = "cpu", # gpu + device_id: int = 1, + all_cores: bool = True, + do_not_use_cluster_ops: bool = True, + all_gpus: bool = False, + *args, + **kwargs, + ) -> Any: + """ + Asynchronously runs the agent with the specified parameters. + + Args: + task (Optional[str]): The task to be performed. Defaults to None. + img (Optional[str]): The image to be processed. Defaults to None. + is_last (bool): Indicates if this is the last task. Defaults to False. + device (str): The device to use for execution. Defaults to "cpu". + device_id (int): The ID of the GPU to use if device is set to "gpu". Defaults to 1. + all_cores (bool): If True, uses all available CPU cores. Defaults to True. + do_not_use_cluster_ops (bool): If True, does not use cluster operations. Defaults to True. + all_gpus (bool): If True, uses all available GPUs. Defaults to False. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. + + Returns: + Any: The result of the asynchronous operation. + + Raises: + Exception: If an error occurs during the asynchronous operation. + """ + try: + return await asyncio.to_thread( + self.run, + task=task, + img=img, + is_last=is_last, + device=device, + device_id=device_id, + all_cores=all_cores, + do_not_use_cluster_ops=do_not_use_cluster_ops, + all_gpus=all_gpus, + *args, + **kwargs, ) - raise error + except Exception as error: + await self._handle_run_error( + error + ) # Ensure this is also async if needed def __call__( self, @@ -1096,8 +1144,10 @@ class Agent: img: Optional[str] = None, is_last: bool = False, device: str = "cpu", # gpu - device_id: int = 0, + device_id: int = 1, all_cores: bool = True, + do_not_use_cluster_ops: bool = True, + all_gpus: bool = False, *args, **kwargs, ) -> Any: @@ -1112,33 +1162,19 @@ class Agent: all_cores (bool): If True, uses all available CPU cores. Defaults to True. """ try: - if task is not None: - return self.run( - task=task, - is_last=is_last, - device=device, - device_id=device_id, - all_cores=all_cores, - *args, - **kwargs, - ) - elif img is not None: - return self.run( - img=img, - is_last=is_last, - device=device, - device_id=device_id, - all_cores=all_cores, - *args, - **kwargs, - ) - else: - raise ValueError( - "Either 'task' or 'img' must be provided." - ) + return self.run( + task=task, + img=img, + is_last=is_last, + device=device, + device_id=device_id, + all_cores=all_cores, + do_not_use_cluster_ops=do_not_use_cluster_ops, + all_gpus=all_gpus * args, + **kwargs, + ) except Exception as error: - logger.error(f"Error calling agent: {error}") - raise error + self._handle_run_error(error) def dict_to_csv(self, data: dict) -> str: """ @@ -1165,33 +1201,31 @@ class Agent: return output.getvalue() def parse_and_execute_tools(self, response: str, *args, **kwargs): - # Try executing the tool - if self.execute_tool is not False: - try: - logger.info("Executing tool...") - - # try to Execute the tool and return a string - out = parse_and_execute_json( - self.tools, - response, - parse_md=True, - *args, - **kwargs, - ) + try: + logger.info("Executing tool...") + + # try to Execute the tool and return a string + out = parse_and_execute_json( + functions=self.tools, + json_string=response, + parse_md=True, + *args, + **kwargs, + ) - out = str(out) + out = str(out) - logger.info(f"Tool Output: {out}") + logger.info(f"Tool Output: {out}") - # Add the output to the memory - self.short_memory.add( - role="Tool Executor", - content=out, - ) + # Add the output to the memory + self.short_memory.add( + role="Tool Executor", + content=out, + ) - except Exception as error: - logger.error(f"Error executing tool: {error}") - raise error + except Exception as error: + logger.error(f"Error executing tool: {error}") + raise error def add_memory(self, message: str): """Add a memory to the agent @@ -1203,6 +1237,7 @@ class Agent: _type_: _description_ """ logger.info(f"Adding memory: {message}") + return self.short_memory.add( role=self.agent_name, content=message ) @@ -1261,7 +1296,9 @@ class Agent: try: logger.info(f"Running concurrent tasks: {tasks}") futures = [ - self.executor.submit(self.run, task, *args, **kwargs) + self.executor.submit( + self.run, task=task, *args, **kwargs + ) for task in tasks ] results = [future.result() for future in futures] @@ -1289,94 +1326,345 @@ class Agent: except Exception as error: logger.info(f"Error running bulk run: {error}", "red") - def save(self) -> None: - """Save the agent history to a file. - - Args: - file_path (_type_): _description_ - """ - file_path = ( - f"{self.saved_state_path}.json" - or f"{self.agent_name}.json" - or f"{self.saved_state_path}.json" - ) + async def arun_batched( + self, + tasks: List[str], + *args, + **kwargs, + ): + """Asynchronously runs a batch of tasks.""" try: - create_file_in_folder( - self.workspace_dir, - file_path, - self.to_json(), - ) - logger.info(f"Saved agent history to: {file_path}") + # Create a list of coroutines for each task + coroutines = [ + self.arun(task=task, *args, **kwargs) + for task in tasks + ] + # Use asyncio.gather to run them concurrently + results = await asyncio.gather(*coroutines) + return results except Exception as error: - logger.error(f"Error saving agent history: {error}") - raise error + logger.error(f"Error running batched tasks: {error}") + raise - def load(self, file_path: str) -> None: + def save(self, file_path: str = None) -> None: """ - Load the agent history from a file, excluding the LLM. + Save the agent state to a file using SafeStateManager with atomic writing + and backup functionality. Automatically handles complex objects and class instances. Args: - file_path (str): The path to the file containing the saved agent history. + file_path (str, optional): Custom path to save the state. + If None, uses configured paths. Raises: - FileNotFoundError: If the specified file path does not exist - json.JSONDecodeError: If the file contains invalid JSON - AttributeError: If there are issues setting agent attributes + OSError: If there are filesystem-related errors Exception: For other unexpected errors """ try: - file_path = ( - f"{self.saved_state_path}.json" - or f"{self.agent_name}.json" - or f"{self.saved_state_path}.json" + # Determine the save path + resolved_path = ( + file_path + or self.saved_state_path + or f"{self.agent_name}_state.json" ) - if not os.path.exists(file_path): - raise FileNotFoundError( - f"File not found at path: {file_path}" + # Ensure path has .json extension + if not resolved_path.endswith(".json"): + resolved_path += ".json" + + # Create full path including workspace directory + full_path = os.path.join( + self.workspace_dir, resolved_path + ) + backup_path = full_path + ".backup" + temp_path = full_path + ".temp" + + # Ensure workspace directory exists + os.makedirs(os.path.dirname(full_path), exist_ok=True) + + # First save to temporary file using SafeStateManager + SafeStateManager.save_state(self, temp_path) + + # If current file exists, create backup + if os.path.exists(full_path): + try: + os.replace(full_path, backup_path) + except Exception as e: + logger.warning(f"Could not create backup: {e}") + + # Move temporary file to final location + os.replace(temp_path, full_path) + + # Clean up old backup if everything succeeded + if os.path.exists(backup_path): + try: + os.remove(backup_path) + except Exception as e: + logger.warning( + f"Could not remove backup file: {e}" + ) + + # Log saved state information if verbose + if self.verbose: + self._log_saved_state_info(full_path) + + logger.info( + f"Successfully saved agent state to: {full_path}" + ) + + # Handle additional component saves + self._save_additional_components(full_path) + + except OSError as e: + logger.error( + f"Filesystem error while saving agent state: {e}" + ) + raise + except Exception as e: + logger.error(f"Unexpected error saving agent state: {e}") + raise + + def _save_additional_components(self, base_path: str) -> None: + """Save additional agent components like memory.""" + try: + # Save long term memory if it exists + if ( + hasattr(self, "long_term_memory") + and self.long_term_memory is not None + ): + memory_path = ( + f"{os.path.splitext(base_path)[0]}_memory.json" ) + try: + self.long_term_memory.save(memory_path) + logger.info( + f"Saved long-term memory to: {memory_path}" + ) + except Exception as e: + logger.warning( + f"Could not save long-term memory: {e}" + ) - with open(file_path, "r") as file: + # Save memory manager if it exists + if ( + hasattr(self, "memory_manager") + and self.memory_manager is not None + ): + manager_path = f"{os.path.splitext(base_path)[0]}_memory_manager.json" try: - data = json.load(file) - except json.JSONDecodeError as e: - logger.error( - f"Invalid JSON in file {file_path}: {str(e)}" + self.memory_manager.save_memory_snapshot( + manager_path + ) + logger.info( + f"Saved memory manager state to: {manager_path}" + ) + except Exception as e: + logger.warning( + f"Could not save memory manager: {e}" ) - raise - if not isinstance(data, dict): - raise ValueError( - f"Expected dict data but got {type(data)}" + except Exception as e: + logger.warning(f"Error saving additional components: {e}") + + def enable_autosave(self, interval: int = 300) -> None: + """ + Enable automatic saving of agent state using SafeStateManager at specified intervals. + + Args: + interval (int): Time between saves in seconds. Defaults to 300 (5 minutes). + """ + + def autosave_loop(): + while self.autosave: + try: + self.save() + if self.verbose: + logger.debug( + f"Autosaved agent state (interval: {interval}s)" + ) + except Exception as e: + logger.error(f"Autosave failed: {e}") + time.sleep(interval) + + self.autosave = True + self.autosave_thread = threading.Thread( + target=autosave_loop, + daemon=True, + name=f"{self.agent_name}_autosave", + ) + self.autosave_thread.start() + logger.info(f"Enabled autosave with {interval}s interval") + + def disable_autosave(self) -> None: + """Disable automatic saving of agent state.""" + if hasattr(self, "autosave"): + self.autosave = False + if hasattr(self, "autosave_thread"): + self.autosave_thread.join(timeout=1) + delattr(self, "autosave_thread") + logger.info("Disabled autosave") + + def cleanup(self) -> None: + """Cleanup method to be called on exit. Ensures final state is saved.""" + try: + if getattr(self, "autosave", False): + logger.info( + "Performing final autosave before exit..." ) + self.disable_autosave() + self.save() + except Exception as e: + logger.error(f"Error during cleanup: {e}") - # Store current LLM - current_llm = self.llm + def load(self, file_path: str = None) -> None: + """ + Load agent state from a file using SafeStateManager. + Automatically preserves class instances and complex objects. - try: - for key, value in data.items(): - if key != "llm": - setattr(self, key, value) - except AttributeError as e: - logger.error( - f"Error setting agent attribute: {str(e)}" + Args: + file_path (str, optional): Path to load state from. + If None, uses default path from agent config. + + Raises: + FileNotFoundError: If state file doesn't exist + Exception: If there's an error during loading + """ + try: + # Resolve load path conditionally with a check for self.load_state_path + resolved_path = ( + file_path + or self.load_state_path + or ( + f"{self.saved_state_path}.json" + if self.saved_state_path + else ( + f"{self.agent_name}.json" + if self.agent_name + else ( + f"{self.workspace_dir}/{self.agent_name}_state.json" + if self.workspace_dir and self.agent_name + else None + ) + ) ) - raise + ) - # Restore LLM - self.llm = current_llm + # Load state using SafeStateManager + SafeStateManager.load_state(self, resolved_path) - logger.info( - f"Successfully loaded agent history from: {file_path}" + # Reinitialize any necessary runtime components + self._reinitialize_after_load() + + if self.verbose: + self._log_loaded_state_info(resolved_path) + + except FileNotFoundError: + logger.error(f"State file not found: {resolved_path}") + raise + except Exception as e: + logger.error(f"Error loading agent state: {e}") + raise + + def _reinitialize_after_load(self) -> None: + """ + Reinitialize necessary components after loading state. + Called automatically after load() to ensure all components are properly set up. + """ + try: + # Reinitialize conversation if needed + if ( + not hasattr(self, "short_memory") + or self.short_memory is None + ): + self.short_memory = Conversation( + system_prompt=self.system_prompt, + time_enabled=True, + user=self.user_name, + rules=self.rules, + ) + + # Reinitialize executor if needed + if not hasattr(self, "executor") or self.executor is None: + self.executor = ThreadPoolExecutor( + max_workers=os.cpu_count() + ) + + # # Reinitialize tool structure if needed + # if hasattr(self, 'tools') and (self.tools or getattr(self, 'list_base_models', None)): + # self.tool_struct = BaseTool( + # tools=self.tools, + # base_models=getattr(self, 'list_base_models', None), + # tool_system_prompt=self.tool_system_prompt + # ) + + except Exception as e: + logger.error(f"Error reinitializing components: {e}") + raise + + def _log_saved_state_info(self, file_path: str) -> None: + """Log information about saved state for debugging""" + try: + state_dict = SafeLoaderUtils.create_state_dict(self) + preserved = SafeLoaderUtils.preserve_instances(self) + + logger.info(f"Saved agent state to: {file_path}") + logger.debug( + f"Saved {len(state_dict)} configuration values" + ) + logger.debug( + f"Preserved {len(preserved)} class instances" ) + if self.verbose: + logger.debug("Preserved instances:") + for name, instance in preserved.items(): + logger.debug( + f" - {name}: {type(instance).__name__}" + ) except Exception as e: - logger.error( - f"Unexpected error loading agent history: {str(e)}" + logger.error(f"Error logging state info: {e}") + + def _log_loaded_state_info(self, file_path: str) -> None: + """Log information about loaded state for debugging""" + try: + state_dict = SafeLoaderUtils.create_state_dict(self) + preserved = SafeLoaderUtils.preserve_instances(self) + + logger.info(f"Loaded agent state from: {file_path}") + logger.debug( + f"Loaded {len(state_dict)} configuration values" + ) + logger.debug( + f"Preserved {len(preserved)} class instances" ) - raise - return None + if self.verbose: + logger.debug("Current class instances:") + for name, instance in preserved.items(): + logger.debug( + f" - {name}: {type(instance).__name__}" + ) + except Exception as e: + logger.error(f"Error logging state info: {e}") + + def get_saveable_state(self) -> Dict[str, Any]: + """ + Get a dictionary of all saveable state values. + Useful for debugging or manual state inspection. + + Returns: + Dict[str, Any]: Dictionary of saveable values + """ + return SafeLoaderUtils.create_state_dict(self) + + def get_preserved_instances(self) -> Dict[str, Any]: + """ + Get a dictionary of all preserved class instances. + Useful for debugging or manual state inspection. + + Returns: + Dict[str, Any]: Dictionary of preserved instances + """ + return SafeLoaderUtils.preserve_instances(self) def graceful_shutdown(self): """Gracefully shutdown the system saving the state""" @@ -1470,24 +1758,6 @@ class Agent: def get_llm_parameters(self): return str(vars(self.llm)) - def save_state(self, *args, **kwargs) -> None: - """ - Saves the current state of the agent to a JSON file, including the llm parameters. - - Args: - file_path (str): The path to the JSON file where the state will be saved. - - Example: - >>> agent.save_state('saved_flow.json') - """ - try: - logger.info(f"Saving Agent {self.agent_name}") - self.save() - logger.info("Saved agent state") - except Exception as error: - logger.error(f"Error saving agent state: {error}") - raise error - def update_system_prompt(self, system_prompt: str): """Upddate the system message""" self.system_prompt = system_prompt @@ -1722,53 +1992,6 @@ class Agent: except Exception as e: print(f"Error occurred during sentiment analysis: {e}") - def count_and_shorten_context_window( - self, history: str, *args, **kwargs - ): - """ - Count the number of tokens in the context window and shorten it if it exceeds the limit. - - Args: - history (str): The history of the conversation. - - Returns: - str: The shortened context window. - """ - # Count the number of tokens in the context window - count = self.tokenizer.count_tokens(history) - - # Shorten the context window if it exceeds the limit, keeping the last n tokens, need to implement the indexing - if count > self.context_length: - history = history[-self.context_length :] - - return history - - def output_cleaner_and_output_type( - self, response: str, *args, **kwargs - ): - """ - Applies the output cleaner function to the response and prepares the output for the output model. - - Args: - response (str): The response to be processed. - - Returns: - str: The processed response. - """ - # Apply the cleaner function to the response - if self.output_cleaner is not None: - logger.info("Applying output cleaner to response.") - response = self.output_cleaner(response) - logger.info(f"Response after output cleaner: {response}") - - # Prepare the output for the output model - if self.output_type is not None: - # logger.info("Preparing output for output model.") - response = prepare_output_for_output_model(response) - print(f"Response after output model: {response}") - - return response - def stream_response( self, response: str, delay: float = 0.001 ) -> None: @@ -1800,37 +2023,6 @@ class Agent: except Exception as e: print(f"An error occurred during streaming: {e}") - def dynamic_context_window(self): - """ - dynamic_context_window essentially clears everything execep - the system prompt and leaves the rest of the contxt window - for RAG query tokens - - """ - # Count the number of tokens in the short term memory - logger.info("Dynamic context window shuffling enabled") - count = self.tokenizer.count_tokens( - self.short_memory.return_history_as_string() - ) - logger.info(f"Number of tokens in memory: {count}") - - # Dynamically allocating everything except the system prompt to be dynamic - # We need to query the short_memory dict, for the system prompt slot - # Then delete everything after that - - if count > self.context_length: - self.short_memory = self.short_memory[ - -self.context_length : - ] - logger.info( - f"Short term memory has been truncated to {self.context_length} tokens" - ) - else: - logger.info("Short term memory is within the limit") - - # Return the memory as a string or update the short term memory - # return memory - def check_available_tokens(self): # Log the amount of tokens left in the memory and in the task if self.tokenizer is not None: @@ -1856,58 +2048,6 @@ class Agent: return out - def truncate_string_by_tokens( - self, input_string: str, limit: int - ) -> str: - """ - Truncate a string if it exceeds a specified number of tokens using a given tokenizer. - - :param input_string: The input string to be tokenized and truncated. - :param tokenizer: The tokenizer function to be used for tokenizing the input string. - :param max_tokens: The maximum number of tokens allowed. - :return: The truncated string if it exceeds the maximum number of tokens; otherwise, the original string. - """ - # Tokenize the input string - tokens = self.tokenizer.count_tokens(input_string) - - # Check if the number of tokens exceeds the maximum limit - if len(tokens) > limit: - # Truncate the tokens to the maximum allowed tokens - truncated_tokens = tokens[: self.context_length] - # Join the truncated tokens back to a string - truncated_string = " ".join(truncated_tokens) - return truncated_string - else: - return input_string - - def tokens_operations(self, input_string: str) -> str: - """ - Perform various operations on tokens of an input string. - - :param input_string: The input string to be processed. - :return: The processed string. - """ - # Tokenize the input string - tokens = self.tokenizer.count_tokens(input_string) - - # Check if the number of tokens exceeds the maximum limit - if len(tokens) > self.context_length: - # Truncate the tokens to the maximum allowed tokens - truncated_tokens = tokens[: self.context_length] - # Join the truncated tokens back to a string - truncated_string = " ".join(truncated_tokens) - return truncated_string - else: - # Log the amount of tokens left in the memory and in the task - if self.tokenizer is not None: - tokens_used = self.tokenizer.count_tokens( - self.short_memory.return_history_as_string() - ) - logger.info( - f"Tokens available: {tokens_used - self.context_length}" - ) - return input_string - def parse_function_call_and_execute(self, response: str): """ Parses a function call from the given response and executes it. @@ -2266,22 +2406,38 @@ class Agent: **kwargs: Arbitrary keyword arguments. Returns: - The result of the method call on the `llm` object. - - """ - # Check if the llm has a __call__, or run, or any other method - if hasattr(self.llm, "__call__"): - return self.llm(task, *args, **kwargs) - elif hasattr(self.llm, "run"): - return self.llm.run(task, *args, **kwargs) - elif hasattr(self.llm, "generate"): - return self.llm.generate(task, *args, **kwargs) - elif hasattr(self.llm, "invoke"): - return self.llm.invoke(task, *args, **kwargs) - else: - raise AttributeError( - "No suitable method found in the llm object." - ) + str: The result of the method call on the `llm` object. + + Raises: + AttributeError: If no suitable method is found in the llm object. + TypeError: If task is not a string or llm object is None. + ValueError: If task is empty. + """ + if not isinstance(task, str): + raise TypeError("Task must be a string") + + if not task.strip(): + raise ValueError("Task cannot be empty") + + if self.llm is None: + raise TypeError("LLM object cannot be None") + + # Define common method names for LLM interfaces + method_names = ["run", "__call__", "generate", "invoke"] + + for method_name in method_names: + if hasattr(self.llm, method_name): + try: + method = getattr(self.llm, method_name) + return method(task, *args, **kwargs) + except Exception as e: + raise RuntimeError( + f"Error calling {method_name}: {str(e)}" + ) + + raise AttributeError( + f"No suitable method found in the llm object. Expected one of: {method_names}" + ) def handle_sop_ops(self): # If the user inputs a list of strings for the sop then join them and set the sop @@ -2306,9 +2462,8 @@ class Agent: device_id: Optional[int] = 0, all_cores: Optional[bool] = True, scheduled_run_date: Optional[datetime] = None, - do_not_use_cluster_ops: Optional[bool] = False, + do_not_use_cluster_ops: Optional[bool] = True, all_gpus: Optional[bool] = False, - generate_speech: Optional[bool] = False, *args, **kwargs, ) -> Any: @@ -2341,6 +2496,7 @@ class Agent: device_id = device_id or self.device_id all_cores = all_cores or self.all_cores all_gpus = all_gpus or self.all_gpus + do_not_use_cluster_ops = ( do_not_use_cluster_ops or self.do_not_use_cluster_ops ) @@ -2358,7 +2514,7 @@ class Agent: return self._run( task=task, img=img, - generate_speech=generate_speech * args, + *args, **kwargs, ) @@ -2371,17 +2527,15 @@ class Agent: func=self._run, task=task, img=img, - generate_speech=generate_speech, *args, **kwargs, ) except ValueError as e: - logger.error(f"Invalid device specified: {e}") - raise e + self._handle_run_error(e) + except Exception as e: - logger.error(f"An error occurred during execution: {e}") - raise e + self._handle_run_error(e) def handle_artifacts( self, text: str, file_output_path: str, file_extension: str @@ -2389,8 +2543,8 @@ class Agent: """Handle creating and saving artifacts with error handling.""" try: # Ensure file_extension starts with a dot - if not file_extension.startswith('.'): - file_extension = '.' + file_extension + if not file_extension.startswith("."): + file_extension = "." + file_extension # If file_output_path doesn't have an extension, treat it as a directory # and create a default filename based on timestamp @@ -2412,18 +2566,26 @@ class Agent: edit_count=0, ) - logger.info(f"Saving artifact with extension: {file_extension}") + logger.info( + f"Saving artifact with extension: {file_extension}" + ) artifact.save_as(file_extension) - logger.success(f"Successfully saved artifact to {full_path}") + logger.success( + f"Successfully saved artifact to {full_path}" + ) except ValueError as e: - logger.error(f"Invalid input values for artifact: {str(e)}") + logger.error( + f"Invalid input values for artifact: {str(e)}" + ) raise except IOError as e: logger.error(f"Error saving artifact to file: {str(e)}") raise except Exception as e: - logger.error(f"Unexpected error handling artifact: {str(e)}") + logger.error( + f"Unexpected error handling artifact: {str(e)}" + ) raise def showcase_config(self):