diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 7695af97..f33f78e7 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -210,6 +210,7 @@ nav: - XAI: "swarms/examples/xai.md" - Swarms Tools: - Agent with HTX + CoinGecko: "swarms/examples/swarms_tools_htx.md" + - Agent with HTX + CoinGecko Function Calling: "swarms/examples/swarms_tools_htx_gecko.md" - Swarm Models: - Overview: "swarms/models/index.md" # - Models Available: "swarms/models/index.md" diff --git a/docs/swarms/examples/swarms_tools_htx_gecko.md b/docs/swarms/examples/swarms_tools_htx_gecko.md new file mode 100644 index 00000000..9f554c53 --- /dev/null +++ b/docs/swarms/examples/swarms_tools_htx_gecko.md @@ -0,0 +1,43 @@ +# Swarms Tools Example with HTX + CoinGecko + +- `pip3 install swarms swarms-tools` +- Add `OPENAI_API_KEY` to your `.env` file +- Run `swarms_tools_htx_gecko.py` +- Agent will make a function call to the desired tool +- The tool will be executed and the result will be returned to the agent +- The agent will then analyze the result and return the final output + + +```python +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) +from swarms_tools import ( + fetch_stock_news, + coin_gecko_coin_api, + fetch_htx_data, +) + +# Initialize the agent +agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4o", + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=3, + context_length=8192, + return_step_meta=False, + output_type="str", # "json", "dict", "csv" OR "string" "yaml" and + auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task + max_tokens=4000, # max output tokens + saved_state_path="agent_00.json", + interactive=False, + tools=[fetch_stock_news, coin_gecko_coin_api, fetch_htx_data], +) + +agent.run("Analyze the $swarms token on htx") +``` \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 0791dfdd..ebf348aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "6.8.8" +version = "6.9.6" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/agents/agent_print.py b/swarms/agents/agent_print.py new file mode 100644 index 00000000..e0969dc6 --- /dev/null +++ b/swarms/agents/agent_print.py @@ -0,0 +1,33 @@ +from swarms.utils.formatter import formatter + + +def agent_print( + agent_name: str, + response: str = None, + loop_count: int = None, + streaming_on: bool = False, +): + """ + Prints the response from an agent based on the streaming mode. + + Args: + agent_name (str): The name of the agent. + response (str): The response from the agent. + loop_count (int): The maximum number of loops. + streaming_on (bool): Indicates if streaming is on or off. + + Returns: + str: The response from the agent. + """ + if streaming_on: + formatter.print_panel_token_by_token( + f"{agent_name}: {response}", + title=f"Agent Name: {agent_name} [Max Loops: {loop_count}]", + ) + else: + formatter.print_panel( + f"{agent_name}: {response}", + f"Agent Name {agent_name} [Max Loops: {loop_count} ]", + ) + + return response diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index f4436988..16bf974e 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -1,5 +1,6 @@ from swarms.structs.agent import Agent from swarms.structs.agents_available import showcase_available_agents +from swarms.structs.async_workflow import AsyncWorkflow from swarms.structs.auto_swarm import AutoSwarm, AutoSwarmRouter from swarms.structs.base_structure import BaseStructure from swarms.structs.base_swarm import BaseSwarm @@ -13,10 +14,10 @@ from swarms.structs.graph_workflow import ( NodeType, ) from swarms.structs.groupchat import ( - GroupChat, + AgentResponse, ChatHistory, ChatTurn, - AgentResponse, + GroupChat, expertise_based, ) from swarms.structs.majority_voting import ( @@ -38,6 +39,7 @@ from swarms.structs.multi_agent_exec import ( run_agents_with_tasks_concurrently, run_single_agent, ) +from swarms.structs.multi_agent_orchestrator import MultiAgentRouter from swarms.structs.queue_swarm import TaskQueueSwarm from swarms.structs.rearrange import AgentRearrange, rearrange from swarms.structs.round_robin import RoundRobinSwarm @@ -79,8 +81,6 @@ from swarms.structs.utils import ( find_token_in_text, parse_tasks, ) -from swarms.structs.async_workflow import AsyncWorkflow -from swarms.structs.multi_agent_orchestrator import MultiAgentRouter __all__ = [ "Agent", diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 97fe3f3d..00a0420d 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -54,6 +54,7 @@ from swarms.utils.wrapper_clusterop import ( exec_callable_with_clusterops, ) from swarms.telemetry.capture_sys_data import log_agent_data +from swarms.agents.agent_print import agent_print # Utils @@ -889,7 +890,33 @@ class Agent: # Check and execute tools if self.tools is not None: - self.parse_and_execute_tools(response) + out = self.parse_and_execute_tools( + response + ) + + self.short_memory.add( + role="Tool Executor", content=out + ) + + agent_print( + f"{self.agent_name} - Tool Executor", + out, + loop_count, + self.streaming_on, + ) + + out = self.llm.run(out) + + agent_print( + f"{self.agent_name} - Agent Analysis", + out, + loop_count, + self.streaming_on, + ) + + self.short_memory.add( + role=self.agent_name, content=out + ) # Add the response to the memory self.short_memory.add( @@ -1209,31 +1236,35 @@ class Agent: return output.getvalue() def parse_and_execute_tools(self, response: str, *args, **kwargs): - try: - logger.info("Executing tool...") - - # try to Execute the tool and return a string - out = parse_and_execute_json( - functions=self.tools, - json_string=response, - parse_md=True, - *args, - **kwargs, - ) - - out = str(out) - - logger.info(f"Tool Output: {out}") - - # Add the output to the memory - self.short_memory.add( - role="Tool Executor", - content=out, - ) + max_retries = 3 # Maximum number of retries + retries = 0 + while retries < max_retries: + try: + logger.info("Executing tool...") - except Exception as error: - logger.error(f"Error executing tool: {error}") - raise error + # try to Execute the tool and return a string + out = parse_and_execute_json( + functions=self.tools, + json_string=response, + parse_md=True, + *args, + **kwargs, + ) + logger.info(f"Tool Output: {out}") + # Add the output to the memory + # self.short_memory.add( + # role="Tool Executor", + # content=out, + # ) + return out + except Exception as error: + retries += 1 + logger.error( + f"Attempt {retries}: Error executing tool: {error}" + ) + if retries == max_retries: + raise error + time.sleep(1) # Wait for a bit before retrying def add_memory(self, message: str): """Add a memory to the agent @@ -2056,45 +2087,6 @@ class Agent: return out - def parse_function_call_and_execute(self, response: str): - """ - Parses a function call from the given response and executes it. - - Args: - response (str): The response containing the function call. - - Returns: - None - - Raises: - Exception: If there is an error parsing and executing the function call. - """ - try: - if self.tools is not None: - tool_call_output = parse_and_execute_json( - self.tools, response, parse_md=True - ) - - if tool_call_output is not str: - tool_call_output = str(tool_call_output) - - logger.info(f"Tool Call Output: {tool_call_output}") - self.short_memory.add( - role=self.agent_name, - content=tool_call_output, - ) - - return tool_call_output - except Exception as error: - logger.error( - f"Error parsing and executing function call: {error}" - ) - - # Raise a custom exception with the error message - raise Exception( - "Error parsing and executing function call" - ) from error - def activate_agentops(self): if self.agent_ops_on is True: try: diff --git a/swarms/structs/agent_security.py b/swarms/structs/agent_security.py new file mode 100644 index 00000000..8e588acf --- /dev/null +++ b/swarms/structs/agent_security.py @@ -0,0 +1,318 @@ +import base64 +import json +import uuid +from datetime import datetime +from dataclasses import dataclass +from typing import Optional, Union, Dict, List + +from cryptography.fernet import Fernet +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + + +@dataclass +class EncryptedMessage: + """Structure for encrypted messages between agents""" + + sender_id: str + receiver_id: str + encrypted_content: bytes + timestamp: float + message_id: str + session_id: str + + +class EncryptionSession: + """Represents an encrypted communication session between agents""" + + def __init__( + self, + session_id: str, + agent_ids: List[str], + encrypted_keys: Dict[str, bytes], + created_at: datetime, + ): + self.session_id = session_id + self.agent_ids = agent_ids + self.encrypted_keys = encrypted_keys + self.created_at = created_at + + +class AgentEncryption: + """ + Handles encryption for agent data both at rest and in transit. + Supports both symmetric (for data at rest) and asymmetric (for data in transit) encryption. + Also supports secure multi-agent communication. + """ + + def __init__( + self, + agent_id: Optional[str] = None, + encryption_key: Optional[str] = None, + enable_transit_encryption: bool = False, + enable_rest_encryption: bool = False, + enable_multi_agent: bool = False, + ): + self.agent_id = agent_id or str(uuid.uuid4()) + self.enable_transit_encryption = enable_transit_encryption + self.enable_rest_encryption = enable_rest_encryption + self.enable_multi_agent = enable_multi_agent + + # Multi-agent communication storage + self.sessions: Dict[str, EncryptionSession] = {} + self.known_agents: Dict[str, "AgentEncryption"] = {} + + if enable_rest_encryption: + # Initialize encryption for data at rest + if encryption_key: + self.encryption_key = base64.urlsafe_b64encode( + PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=f"agent_{self.agent_id}".encode(), # Unique salt per agent + iterations=100000, + ).derive(encryption_key.encode()) + ) + else: + self.encryption_key = Fernet.generate_key() + + self.cipher_suite = Fernet(self.encryption_key) + + if enable_transit_encryption or enable_multi_agent: + # Generate RSA key pair for transit encryption + self.private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048 + ) + self.public_key = self.private_key.public_key() + + def register_agent( + self, agent_id: str, agent_encryption: "AgentEncryption" + ) -> None: + """Register another agent for secure communication""" + if not self.enable_multi_agent: + raise ValueError("Multi-agent support is not enabled") + self.known_agents[agent_id] = agent_encryption + + def create_session(self, agent_ids: List[str]) -> str: + """Create a new encrypted session between multiple agents""" + if not self.enable_multi_agent: + raise ValueError("Multi-agent support is not enabled") + + session_id = str(uuid.uuid4()) + + # Generate a shared session key + session_key = Fernet.generate_key() + + # Create encrypted copies of the session key for each agent + encrypted_keys = {} + for agent_id in agent_ids: + if ( + agent_id not in self.known_agents + and agent_id != self.agent_id + ): + raise ValueError(f"Agent {agent_id} not registered") + + if agent_id == self.agent_id: + agent_public_key = self.public_key + else: + agent_public_key = self.known_agents[ + agent_id + ].public_key + + encrypted_key = agent_public_key.encrypt( + session_key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + encrypted_keys[agent_id] = encrypted_key + + # Store session information + self.sessions[session_id] = EncryptionSession( + session_id=session_id, + agent_ids=agent_ids, + encrypted_keys=encrypted_keys, + created_at=datetime.now(), + ) + + return session_id + + def encrypt_message( + self, + content: Union[str, dict], + receiver_id: str, + session_id: str, + ) -> EncryptedMessage: + """Encrypt a message for another agent within a session""" + if not self.enable_multi_agent: + raise ValueError("Multi-agent support is not enabled") + + if session_id not in self.sessions: + raise ValueError("Invalid session ID") + + session = self.sessions[session_id] + if ( + self.agent_id not in session.agent_ids + or receiver_id not in session.agent_ids + ): + raise ValueError("Sender or receiver not in session") + + # Serialize content if it's a dictionary + if isinstance(content, dict): + content = json.dumps(content) + + # Get the session key + encrypted_session_key = session.encrypted_keys[self.agent_id] + session_key = self.decrypt_session_key(encrypted_session_key) + + # Create Fernet cipher with session key + cipher = Fernet(session_key) + + # Encrypt the message + encrypted_content = cipher.encrypt(content.encode()) + + return EncryptedMessage( + sender_id=self.agent_id, + receiver_id=receiver_id, + encrypted_content=encrypted_content, + timestamp=datetime.now().timestamp(), + message_id=str(uuid.uuid4()), + session_id=session_id, + ) + + def decrypt_message( + self, message: EncryptedMessage + ) -> Union[str, dict]: + """Decrypt a message from another agent""" + if not self.enable_multi_agent: + raise ValueError("Multi-agent support is not enabled") + + if message.session_id not in self.sessions: + raise ValueError("Invalid session ID") + + if self.agent_id != message.receiver_id: + raise ValueError("Message not intended for this agent") + + session = self.sessions[message.session_id] + + # Get the session key + encrypted_session_key = session.encrypted_keys[self.agent_id] + session_key = self.decrypt_session_key(encrypted_session_key) + + # Create Fernet cipher with session key + cipher = Fernet(session_key) + + # Decrypt the message + decrypted_content = cipher.decrypt( + message.encrypted_content + ).decode() + + # Try to parse as JSON + try: + return json.loads(decrypted_content) + except json.JSONDecodeError: + return decrypted_content + + def decrypt_session_key(self, encrypted_key: bytes) -> bytes: + """Decrypt a session key using the agent's private key""" + return self.private_key.decrypt( + encrypted_key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + + # Original methods preserved below + def encrypt_at_rest(self, data: Union[str, dict, bytes]) -> bytes: + """Encrypts data for storage""" + if not self.enable_rest_encryption: + return ( + data + if isinstance(data, bytes) + else str(data).encode() + ) + + if isinstance(data, dict): + data = json.dumps(data) + if isinstance(data, str): + data = data.encode() + + return self.cipher_suite.encrypt(data) + + def decrypt_at_rest( + self, encrypted_data: bytes + ) -> Union[str, dict]: + """Decrypts stored data""" + if not self.enable_rest_encryption: + return encrypted_data.decode() + + decrypted_data = self.cipher_suite.decrypt(encrypted_data) + + try: + return json.loads(decrypted_data) + except json.JSONDecodeError: + return decrypted_data.decode() + + def encrypt_for_transit(self, data: Union[str, dict]) -> bytes: + """Encrypts data for transmission""" + if not self.enable_transit_encryption: + return str(data).encode() + + if isinstance(data, dict): + data = json.dumps(data) + + return self.public_key.encrypt( + data.encode(), + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + + def decrypt_from_transit( + self, data: Union[bytes, str] + ) -> Union[str, dict]: + """Decrypts received data, handling both encrypted and unencrypted inputs""" + if not self.enable_transit_encryption: + return data.decode() if isinstance(data, bytes) else data + + try: + if isinstance(data, bytes) and len(data) == 256: + decrypted_data = self.private_key.decrypt( + data, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ).decode() + else: + return ( + data.decode() if isinstance(data, bytes) else data + ) + + try: + return json.loads(decrypted_data) + except json.JSONDecodeError: + return decrypted_data + except ValueError: + return data.decode() if isinstance(data, bytes) else data + + def get_public_key_pem(self) -> bytes: + """Returns the public key in PEM format for sharing""" + if ( + not self.enable_transit_encryption + and not self.enable_multi_agent + ): + return b"" + + return self.public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index a0f73a98..e0ca4d05 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -12,6 +12,8 @@ from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.rearrange import AgentRearrange from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm +from swarms.structs.groupchat import GroupChat +from swarms.structs.multi_agent_orchestrator import MultiAgentRouter from swarms.structs.swarm_matcher import swarm_matcher from swarms.utils.wrapper_clusterop import ( exec_callable_with_clusterops, @@ -26,6 +28,8 @@ SwarmType = Literal[ "SpreadSheetSwarm", "SequentialWorkflow", "ConcurrentWorkflow", + "GroupChat", + "MultiAgentRouter", "auto", ] @@ -137,6 +141,7 @@ class SwarmRouter: documents: List[str] = [], # A list of docs file paths output_type: str = "string", # Md, PDF, Txt, csv no_cluster_ops: bool = False, + speaker_fn: callable = None, *args, **kwargs, ): @@ -154,6 +159,7 @@ class SwarmRouter: self.documents = documents self.output_type = output_type self.no_cluster_ops = no_cluster_ops + self.speaker_fn = speaker_fn self.logs = [] self.reliability_check() @@ -174,8 +180,6 @@ class SwarmRouter: if self.rules is not None: self.handle_rules() - # let's make a function that checks the agents parameter and disables clusterops - def deactivate_clusterops(self): for agent in self.agents: agent.do_not_use_cluster_ops = True @@ -295,6 +299,26 @@ class SwarmRouter: *args, **kwargs, ) + + elif self.swarm_type == "GroupChat": + return GroupChat( + name=self.name, + description=self.description, + agents=self.agents, + max_loops=self.max_loops, + speaker_fn=self.speaker_fn, + *args, + **kwargs, + ) + + elif self.swarm_type == "MultiAgentRouter": + return MultiAgentRouter( + name=self.name, + description=self.description, + agents=self.agents, + shared_memory_system=self.shared_memory_system, + output_type=self.output_type, + ) elif self.swarm_type == "SpreadSheetSwarm": return SpreadSheetSwarm( name=self.name, diff --git a/swarms/tools/tool_parse_exec.py b/swarms/tools/tool_parse_exec.py index 7cc4369f..f118319d 100644 --- a/swarms/tools/tool_parse_exec.py +++ b/swarms/tools/tool_parse_exec.py @@ -12,17 +12,19 @@ def parse_and_execute_json( json_string: str, parse_md: bool = False, verbose: bool = False, - return_str: bool = True, -) -> dict: + max_retries: int = 3, +) -> str: """ Parses and executes a JSON string containing function names and parameters. Args: - functions (List[callable]): A list of callable functions. + functions (List[Callable[..., Any]]): A list of callable functions. json_string (str): The JSON string to parse and execute. parse_md (bool): Flag indicating whether to extract code from Markdown. verbose (bool): Flag indicating whether to enable verbose logging. return_str (bool): Flag indicating whether to return a JSON string. + max_retries (int): Maximum number of retries for executing functions. + Returns: dict: A dictionary containing the results of executing the functions with the parsed parameters. """ @@ -30,10 +32,20 @@ def parse_and_execute_json( raise ValueError("Functions and JSON string are required") if parse_md: - json_string = extract_code_from_markdown(json_string) + try: + json_string = extract_code_from_markdown(json_string) + except Exception as e: + logger.error(f"Error extracting code from Markdown: {e}") + return {"error": f"Markdown parsing failed: {str(e)}"} try: - # Create function name to function mapping + # Ensure JSON string is stripped of extraneous whitespace + json_string = json_string.strip() + if not json_string: + raise ValueError( + "JSON string is empty after stripping whitespace" + ) + function_dict = {func.__name__: func for func in functions} if verbose: @@ -42,83 +54,80 @@ def parse_and_execute_json( ) logger.info(f"Processing JSON: {json_string}") - # Parse JSON data - data = json.loads(json_string) + try: + data = json.loads(json_string) + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON format: {e}") + return {"error": f"Invalid JSON format: {str(e)}"} - # Handle both single function and function list formats function_list = [] if "functions" in data: function_list = data["functions"] elif "function" in data: function_list = [data["function"]] else: - function_list = [ - data - ] # Assume entire object is single function + function_list = [data] - # Ensure function_list is a list and filter None values if isinstance(function_list, dict): function_list = [function_list] + function_list = [f for f in function_list if f] if verbose: logger.info(f"Processing {len(function_list)} functions") results = {} + for function_data in function_list: function_name = function_data.get("name") parameters = function_data.get("parameters", {}) if not function_name: - logger.warning("Function data missing name field") + logger.warning("Function data missing 'name' field") continue if verbose: logger.info( - f"Executing {function_name} with params: {parameters}" + f"Executing {function_name} with parameters: {parameters}" ) if function_name not in function_dict: - logger.warning(f"Function {function_name} not found") - results[function_name] = None + logger.warning( + f"Function '{function_name}' not found" + ) + results[function_name] = "Error: Function not found" continue - try: - result = function_dict[function_name](**parameters) - results[function_name] = str(result) - if verbose: - logger.info( - f"Result for {function_name}: {result}" + for attempt in range(max_retries): + try: + result = function_dict[function_name]( + **parameters ) - except Exception as e: - logger.error( - f"Error executing {function_name}: {str(e)}" - ) - results[function_name] = f"Error: {str(e)}" + results[function_name] = str(result) + if verbose: + logger.info( + f"Result for {function_name}: {result}" + ) + break + except Exception as e: + logger.error( + f"Attempt {attempt + 1} failed for {function_name}: {e}" + ) + if attempt == max_retries - 1: + results[function_name] = ( + f"Error after {max_retries} attempts: {str(e)}" + ) - # Format final results - if len(results) == 1: - # Return single result directly - data = {"result": next(iter(results.values()))} - else: - # Return all results - data = { - "results": results, - "summary": "\n".join( - f"{k}: {v}" for k, v in results.items() - ), - } - - if return_str: - return json.dumps(data) - else: - return data + data = { + "results": results, + "summary": "\n".join( + f"{k}: {v}" for k, v in results.items() + ), + } + + return json.dumps(data, indent=4) - except json.JSONDecodeError as e: - error = f"Invalid JSON format: {str(e)}" - logger.error(error) - return {"error": error} except Exception as e: - error = f"Error parsing and executing JSON: {str(e)}" + error = f"Unexpected error during execution: {str(e)}" logger.error(error) return {"error": error} diff --git a/swarms_tool_example_simple.py b/swarms_tool_example_simple.py new file mode 100644 index 00000000..96311883 --- /dev/null +++ b/swarms_tool_example_simple.py @@ -0,0 +1,30 @@ +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) +from swarms_tools import ( + coin_gecko_coin_api, + fetch_htx_data, +) + + +# Initialize the agent +agent = Agent( + agent_name="Financial-Analysis-Agent", + agent_description="Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4o", + dynamic_temperature_enabled=True, + user_name="swarms_corp", + return_step_meta=False, + output_type="str", # "json", "dict", "csv" OR "string" "yaml" and + auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task + max_tokens=4000, # max output tokens + saved_state_path="agent_00.json", + interactive=False, +) + +agent.run( + f"Analyze the $swarms token on HTX with data: {fetch_htx_data('swarms')}. Additionally, consider the following CoinGecko data: {coin_gecko_coin_api('swarms')}" +) diff --git a/swarms_tools_example.py b/swarms_tools_example.py index 96311883..9171bb30 100644 --- a/swarms_tools_example.py +++ b/swarms_tools_example.py @@ -3,11 +3,11 @@ from swarms.prompts.finance_agent_sys_prompt import ( FINANCIAL_AGENT_SYS_PROMPT, ) from swarms_tools import ( + fetch_stock_news, coin_gecko_coin_api, fetch_htx_data, ) - # Initialize the agent agent = Agent( agent_name="Financial-Analysis-Agent", @@ -17,14 +17,15 @@ agent = Agent( model_name="gpt-4o", dynamic_temperature_enabled=True, user_name="swarms_corp", + retry_attempts=3, + context_length=8192, return_step_meta=False, output_type="str", # "json", "dict", "csv" OR "string" "yaml" and auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task max_tokens=4000, # max output tokens saved_state_path="agent_00.json", interactive=False, + tools=[fetch_stock_news, coin_gecko_coin_api, fetch_htx_data], ) -agent.run( - f"Analyze the $swarms token on HTX with data: {fetch_htx_data('swarms')}. Additionally, consider the following CoinGecko data: {coin_gecko_coin_api('swarms')}" -) +agent.run("Analyze the $swarms token on htx") diff --git a/tests/tools/test_parse_tools.py b/tests/tools/test_parse_tools.py new file mode 100644 index 00000000..ef65dddd --- /dev/null +++ b/tests/tools/test_parse_tools.py @@ -0,0 +1,92 @@ +# Define a simple testing framework +from swarms.tools.tool_parse_exec import parse_and_execute_json + + +def run_test(test_name, test_func): + print(f"Running test: {test_name}") + print("------------------------------------------------") + try: + test_func() + print(f"✓ {test_name} passed") + print("------------------------------------------------") + except Exception as e: + print(f"✗ {test_name} failed: {str(e)}") + print("------------------------------------------------") + + +# Mock functions for testing +def mock_function_a(param1, param2): + return param1 + param2 + + +def mock_function_b(param1): + if param1 < 0: + raise ValueError("Negative value not allowed") + return param1 * 2 + + +# Test cases +def test_parse_and_execute_json_success(): + functions = [mock_function_a, mock_function_b] + json_string = '{"functions": [{"name": "mock_function_a", "parameters": {"param1": 1, "param2": 2}}, {"name": "mock_function_b", "parameters": {"param1": 3}}]}' + + result = parse_and_execute_json(functions, json_string) + expected_result = { + "results": {"mock_function_a": "3", "mock_function_b": "6"}, + "summary": "mock_function_a: 3\nmock_function_b: 6", + } + + assert ( + result == expected_result + ), f"Expected {expected_result}, but got {result}" + + +def test_parse_and_execute_json_function_not_found(): + functions = [mock_function_a] + json_string = '{"functions": [{"name": "non_existent_function", "parameters": {}}]}' + + result = parse_and_execute_json(functions, json_string) + expected_result = { + "results": { + "non_existent_function": "Error: Function non_existent_function not found" + }, + "summary": "non_existent_function: Error: Function non_existent_function not found", + } + + assert ( + result == expected_result + ), f"Expected {expected_result}, but got {result}" + + +def test_parse_and_execute_json_error_handling(): + functions = [mock_function_b] + json_string = '{"functions": [{"name": "mock_function_b", "parameters": {"param1": -1}}]}' + + result = parse_and_execute_json(functions, json_string) + expected_result = { + "results": { + "mock_function_b": "Error: Negative value not allowed" + }, + "summary": "mock_function_b: Error: Negative value not allowed", + } + + assert ( + result == expected_result + ), f"Expected {expected_result}, but got {result}" + + +# Run tests +run_test( + "Test parse_and_execute_json success", + test_parse_and_execute_json_success, +) +print("------------------------------------------------") +run_test( + "Test parse_and_execute_json function not found", + test_parse_and_execute_json_function_not_found, +) +print("------------------------------------------------") +run_test( + "Test parse_and_execute_json error handling", + test_parse_and_execute_json_error_handling, +)