[DOCS][DOCS]

pull/758/head
Kye Gomez 2 weeks ago
parent 5e4a600d47
commit d44fbcd700

@ -210,6 +210,7 @@ nav:
- XAI: "swarms/examples/xai.md" - XAI: "swarms/examples/xai.md"
- Swarms Tools: - Swarms Tools:
- Agent with HTX + CoinGecko: "swarms/examples/swarms_tools_htx.md" - Agent with HTX + CoinGecko: "swarms/examples/swarms_tools_htx.md"
- Agent with HTX + CoinGecko Function Calling: "swarms/examples/swarms_tools_htx_gecko.md"
- Swarm Models: - Swarm Models:
- Overview: "swarms/models/index.md" - Overview: "swarms/models/index.md"
# - Models Available: "swarms/models/index.md" # - Models Available: "swarms/models/index.md"

@ -0,0 +1,43 @@
# Swarms Tools Example with HTX + CoinGecko
- `pip3 install swarms swarms-tools`
- Add `OPENAI_API_KEY` to your `.env` file
- Run `swarms_tools_htx_gecko.py`
- Agent will make a function call to the desired tool
- The tool will be executed and the result will be returned to the agent
- The agent will then analyze the result and return the final output
```python
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms_tools import (
fetch_stock_news,
coin_gecko_coin_api,
fetch_htx_data,
)
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4o",
dynamic_temperature_enabled=True,
user_name="swarms_corp",
retry_attempts=3,
context_length=8192,
return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
max_tokens=4000, # max output tokens
saved_state_path="agent_00.json",
interactive=False,
tools=[fetch_stock_news, coin_gecko_coin_api, fetch_htx_data],
)
agent.run("Analyze the $swarms token on htx")
```

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "6.8.8" version = "6.9.6"
description = "Swarms - TGSC" description = "Swarms - TGSC"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]

@ -0,0 +1,33 @@
from swarms.utils.formatter import formatter
def agent_print(
agent_name: str,
response: str = None,
loop_count: int = None,
streaming_on: bool = False,
):
"""
Prints the response from an agent based on the streaming mode.
Args:
agent_name (str): The name of the agent.
response (str): The response from the agent.
loop_count (int): The maximum number of loops.
streaming_on (bool): Indicates if streaming is on or off.
Returns:
str: The response from the agent.
"""
if streaming_on:
formatter.print_panel_token_by_token(
f"{agent_name}: {response}",
title=f"Agent Name: {agent_name} [Max Loops: {loop_count}]",
)
else:
formatter.print_panel(
f"{agent_name}: {response}",
f"Agent Name {agent_name} [Max Loops: {loop_count} ]",
)
return response

@ -1,5 +1,6 @@
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.agents_available import showcase_available_agents from swarms.structs.agents_available import showcase_available_agents
from swarms.structs.async_workflow import AsyncWorkflow
from swarms.structs.auto_swarm import AutoSwarm, AutoSwarmRouter from swarms.structs.auto_swarm import AutoSwarm, AutoSwarmRouter
from swarms.structs.base_structure import BaseStructure from swarms.structs.base_structure import BaseStructure
from swarms.structs.base_swarm import BaseSwarm from swarms.structs.base_swarm import BaseSwarm
@ -13,10 +14,10 @@ from swarms.structs.graph_workflow import (
NodeType, NodeType,
) )
from swarms.structs.groupchat import ( from swarms.structs.groupchat import (
GroupChat, AgentResponse,
ChatHistory, ChatHistory,
ChatTurn, ChatTurn,
AgentResponse, GroupChat,
expertise_based, expertise_based,
) )
from swarms.structs.majority_voting import ( from swarms.structs.majority_voting import (
@ -38,6 +39,7 @@ from swarms.structs.multi_agent_exec import (
run_agents_with_tasks_concurrently, run_agents_with_tasks_concurrently,
run_single_agent, run_single_agent,
) )
from swarms.structs.multi_agent_orchestrator import MultiAgentRouter
from swarms.structs.queue_swarm import TaskQueueSwarm from swarms.structs.queue_swarm import TaskQueueSwarm
from swarms.structs.rearrange import AgentRearrange, rearrange from swarms.structs.rearrange import AgentRearrange, rearrange
from swarms.structs.round_robin import RoundRobinSwarm from swarms.structs.round_robin import RoundRobinSwarm
@ -79,8 +81,6 @@ from swarms.structs.utils import (
find_token_in_text, find_token_in_text,
parse_tasks, parse_tasks,
) )
from swarms.structs.async_workflow import AsyncWorkflow
from swarms.structs.multi_agent_orchestrator import MultiAgentRouter
__all__ = [ __all__ = [
"Agent", "Agent",

@ -54,6 +54,7 @@ from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops, exec_callable_with_clusterops,
) )
from swarms.telemetry.capture_sys_data import log_agent_data from swarms.telemetry.capture_sys_data import log_agent_data
from swarms.agents.agent_print import agent_print
# Utils # Utils
@ -889,7 +890,33 @@ class Agent:
# Check and execute tools # Check and execute tools
if self.tools is not None: if self.tools is not None:
self.parse_and_execute_tools(response) out = self.parse_and_execute_tools(
response
)
self.short_memory.add(
role="Tool Executor", content=out
)
agent_print(
f"{self.agent_name} - Tool Executor",
out,
loop_count,
self.streaming_on,
)
out = self.llm.run(out)
agent_print(
f"{self.agent_name} - Agent Analysis",
out,
loop_count,
self.streaming_on,
)
self.short_memory.add(
role=self.agent_name, content=out
)
# Add the response to the memory # Add the response to the memory
self.short_memory.add( self.short_memory.add(
@ -1209,31 +1236,35 @@ class Agent:
return output.getvalue() return output.getvalue()
def parse_and_execute_tools(self, response: str, *args, **kwargs): def parse_and_execute_tools(self, response: str, *args, **kwargs):
try: max_retries = 3 # Maximum number of retries
logger.info("Executing tool...") retries = 0
while retries < max_retries:
# try to Execute the tool and return a string try:
out = parse_and_execute_json( logger.info("Executing tool...")
functions=self.tools,
json_string=response,
parse_md=True,
*args,
**kwargs,
)
out = str(out)
logger.info(f"Tool Output: {out}")
# Add the output to the memory
self.short_memory.add(
role="Tool Executor",
content=out,
)
except Exception as error: # try to Execute the tool and return a string
logger.error(f"Error executing tool: {error}") out = parse_and_execute_json(
raise error functions=self.tools,
json_string=response,
parse_md=True,
*args,
**kwargs,
)
logger.info(f"Tool Output: {out}")
# Add the output to the memory
# self.short_memory.add(
# role="Tool Executor",
# content=out,
# )
return out
except Exception as error:
retries += 1
logger.error(
f"Attempt {retries}: Error executing tool: {error}"
)
if retries == max_retries:
raise error
time.sleep(1) # Wait for a bit before retrying
def add_memory(self, message: str): def add_memory(self, message: str):
"""Add a memory to the agent """Add a memory to the agent
@ -2056,45 +2087,6 @@ class Agent:
return out return out
def parse_function_call_and_execute(self, response: str):
"""
Parses a function call from the given response and executes it.
Args:
response (str): The response containing the function call.
Returns:
None
Raises:
Exception: If there is an error parsing and executing the function call.
"""
try:
if self.tools is not None:
tool_call_output = parse_and_execute_json(
self.tools, response, parse_md=True
)
if tool_call_output is not str:
tool_call_output = str(tool_call_output)
logger.info(f"Tool Call Output: {tool_call_output}")
self.short_memory.add(
role=self.agent_name,
content=tool_call_output,
)
return tool_call_output
except Exception as error:
logger.error(
f"Error parsing and executing function call: {error}"
)
# Raise a custom exception with the error message
raise Exception(
"Error parsing and executing function call"
) from error
def activate_agentops(self): def activate_agentops(self):
if self.agent_ops_on is True: if self.agent_ops_on is True:
try: try:

@ -0,0 +1,318 @@
import base64
import json
import uuid
from datetime import datetime
from dataclasses import dataclass
from typing import Optional, Union, Dict, List
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
@dataclass
class EncryptedMessage:
"""Structure for encrypted messages between agents"""
sender_id: str
receiver_id: str
encrypted_content: bytes
timestamp: float
message_id: str
session_id: str
class EncryptionSession:
"""Represents an encrypted communication session between agents"""
def __init__(
self,
session_id: str,
agent_ids: List[str],
encrypted_keys: Dict[str, bytes],
created_at: datetime,
):
self.session_id = session_id
self.agent_ids = agent_ids
self.encrypted_keys = encrypted_keys
self.created_at = created_at
class AgentEncryption:
"""
Handles encryption for agent data both at rest and in transit.
Supports both symmetric (for data at rest) and asymmetric (for data in transit) encryption.
Also supports secure multi-agent communication.
"""
def __init__(
self,
agent_id: Optional[str] = None,
encryption_key: Optional[str] = None,
enable_transit_encryption: bool = False,
enable_rest_encryption: bool = False,
enable_multi_agent: bool = False,
):
self.agent_id = agent_id or str(uuid.uuid4())
self.enable_transit_encryption = enable_transit_encryption
self.enable_rest_encryption = enable_rest_encryption
self.enable_multi_agent = enable_multi_agent
# Multi-agent communication storage
self.sessions: Dict[str, EncryptionSession] = {}
self.known_agents: Dict[str, "AgentEncryption"] = {}
if enable_rest_encryption:
# Initialize encryption for data at rest
if encryption_key:
self.encryption_key = base64.urlsafe_b64encode(
PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=f"agent_{self.agent_id}".encode(), # Unique salt per agent
iterations=100000,
).derive(encryption_key.encode())
)
else:
self.encryption_key = Fernet.generate_key()
self.cipher_suite = Fernet(self.encryption_key)
if enable_transit_encryption or enable_multi_agent:
# Generate RSA key pair for transit encryption
self.private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048
)
self.public_key = self.private_key.public_key()
def register_agent(
self, agent_id: str, agent_encryption: "AgentEncryption"
) -> None:
"""Register another agent for secure communication"""
if not self.enable_multi_agent:
raise ValueError("Multi-agent support is not enabled")
self.known_agents[agent_id] = agent_encryption
def create_session(self, agent_ids: List[str]) -> str:
"""Create a new encrypted session between multiple agents"""
if not self.enable_multi_agent:
raise ValueError("Multi-agent support is not enabled")
session_id = str(uuid.uuid4())
# Generate a shared session key
session_key = Fernet.generate_key()
# Create encrypted copies of the session key for each agent
encrypted_keys = {}
for agent_id in agent_ids:
if (
agent_id not in self.known_agents
and agent_id != self.agent_id
):
raise ValueError(f"Agent {agent_id} not registered")
if agent_id == self.agent_id:
agent_public_key = self.public_key
else:
agent_public_key = self.known_agents[
agent_id
].public_key
encrypted_key = agent_public_key.encrypt(
session_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
encrypted_keys[agent_id] = encrypted_key
# Store session information
self.sessions[session_id] = EncryptionSession(
session_id=session_id,
agent_ids=agent_ids,
encrypted_keys=encrypted_keys,
created_at=datetime.now(),
)
return session_id
def encrypt_message(
self,
content: Union[str, dict],
receiver_id: str,
session_id: str,
) -> EncryptedMessage:
"""Encrypt a message for another agent within a session"""
if not self.enable_multi_agent:
raise ValueError("Multi-agent support is not enabled")
if session_id not in self.sessions:
raise ValueError("Invalid session ID")
session = self.sessions[session_id]
if (
self.agent_id not in session.agent_ids
or receiver_id not in session.agent_ids
):
raise ValueError("Sender or receiver not in session")
# Serialize content if it's a dictionary
if isinstance(content, dict):
content = json.dumps(content)
# Get the session key
encrypted_session_key = session.encrypted_keys[self.agent_id]
session_key = self.decrypt_session_key(encrypted_session_key)
# Create Fernet cipher with session key
cipher = Fernet(session_key)
# Encrypt the message
encrypted_content = cipher.encrypt(content.encode())
return EncryptedMessage(
sender_id=self.agent_id,
receiver_id=receiver_id,
encrypted_content=encrypted_content,
timestamp=datetime.now().timestamp(),
message_id=str(uuid.uuid4()),
session_id=session_id,
)
def decrypt_message(
self, message: EncryptedMessage
) -> Union[str, dict]:
"""Decrypt a message from another agent"""
if not self.enable_multi_agent:
raise ValueError("Multi-agent support is not enabled")
if message.session_id not in self.sessions:
raise ValueError("Invalid session ID")
if self.agent_id != message.receiver_id:
raise ValueError("Message not intended for this agent")
session = self.sessions[message.session_id]
# Get the session key
encrypted_session_key = session.encrypted_keys[self.agent_id]
session_key = self.decrypt_session_key(encrypted_session_key)
# Create Fernet cipher with session key
cipher = Fernet(session_key)
# Decrypt the message
decrypted_content = cipher.decrypt(
message.encrypted_content
).decode()
# Try to parse as JSON
try:
return json.loads(decrypted_content)
except json.JSONDecodeError:
return decrypted_content
def decrypt_session_key(self, encrypted_key: bytes) -> bytes:
"""Decrypt a session key using the agent's private key"""
return self.private_key.decrypt(
encrypted_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
# Original methods preserved below
def encrypt_at_rest(self, data: Union[str, dict, bytes]) -> bytes:
"""Encrypts data for storage"""
if not self.enable_rest_encryption:
return (
data
if isinstance(data, bytes)
else str(data).encode()
)
if isinstance(data, dict):
data = json.dumps(data)
if isinstance(data, str):
data = data.encode()
return self.cipher_suite.encrypt(data)
def decrypt_at_rest(
self, encrypted_data: bytes
) -> Union[str, dict]:
"""Decrypts stored data"""
if not self.enable_rest_encryption:
return encrypted_data.decode()
decrypted_data = self.cipher_suite.decrypt(encrypted_data)
try:
return json.loads(decrypted_data)
except json.JSONDecodeError:
return decrypted_data.decode()
def encrypt_for_transit(self, data: Union[str, dict]) -> bytes:
"""Encrypts data for transmission"""
if not self.enable_transit_encryption:
return str(data).encode()
if isinstance(data, dict):
data = json.dumps(data)
return self.public_key.encrypt(
data.encode(),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
def decrypt_from_transit(
self, data: Union[bytes, str]
) -> Union[str, dict]:
"""Decrypts received data, handling both encrypted and unencrypted inputs"""
if not self.enable_transit_encryption:
return data.decode() if isinstance(data, bytes) else data
try:
if isinstance(data, bytes) and len(data) == 256:
decrypted_data = self.private_key.decrypt(
data,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
).decode()
else:
return (
data.decode() if isinstance(data, bytes) else data
)
try:
return json.loads(decrypted_data)
except json.JSONDecodeError:
return decrypted_data
except ValueError:
return data.decode() if isinstance(data, bytes) else data
def get_public_key_pem(self) -> bytes:
"""Returns the public key in PEM format for sharing"""
if (
not self.enable_transit_encryption
and not self.enable_multi_agent
):
return b""
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)

@ -12,6 +12,8 @@ from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.structs.rearrange import AgentRearrange from swarms.structs.rearrange import AgentRearrange
from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
from swarms.structs.groupchat import GroupChat
from swarms.structs.multi_agent_orchestrator import MultiAgentRouter
from swarms.structs.swarm_matcher import swarm_matcher from swarms.structs.swarm_matcher import swarm_matcher
from swarms.utils.wrapper_clusterop import ( from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops, exec_callable_with_clusterops,
@ -26,6 +28,8 @@ SwarmType = Literal[
"SpreadSheetSwarm", "SpreadSheetSwarm",
"SequentialWorkflow", "SequentialWorkflow",
"ConcurrentWorkflow", "ConcurrentWorkflow",
"GroupChat",
"MultiAgentRouter",
"auto", "auto",
] ]
@ -137,6 +141,7 @@ class SwarmRouter:
documents: List[str] = [], # A list of docs file paths documents: List[str] = [], # A list of docs file paths
output_type: str = "string", # Md, PDF, Txt, csv output_type: str = "string", # Md, PDF, Txt, csv
no_cluster_ops: bool = False, no_cluster_ops: bool = False,
speaker_fn: callable = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -154,6 +159,7 @@ class SwarmRouter:
self.documents = documents self.documents = documents
self.output_type = output_type self.output_type = output_type
self.no_cluster_ops = no_cluster_ops self.no_cluster_ops = no_cluster_ops
self.speaker_fn = speaker_fn
self.logs = [] self.logs = []
self.reliability_check() self.reliability_check()
@ -174,8 +180,6 @@ class SwarmRouter:
if self.rules is not None: if self.rules is not None:
self.handle_rules() self.handle_rules()
# let's make a function that checks the agents parameter and disables clusterops
def deactivate_clusterops(self): def deactivate_clusterops(self):
for agent in self.agents: for agent in self.agents:
agent.do_not_use_cluster_ops = True agent.do_not_use_cluster_ops = True
@ -295,6 +299,26 @@ class SwarmRouter:
*args, *args,
**kwargs, **kwargs,
) )
elif self.swarm_type == "GroupChat":
return GroupChat(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
speaker_fn=self.speaker_fn,
*args,
**kwargs,
)
elif self.swarm_type == "MultiAgentRouter":
return MultiAgentRouter(
name=self.name,
description=self.description,
agents=self.agents,
shared_memory_system=self.shared_memory_system,
output_type=self.output_type,
)
elif self.swarm_type == "SpreadSheetSwarm": elif self.swarm_type == "SpreadSheetSwarm":
return SpreadSheetSwarm( return SpreadSheetSwarm(
name=self.name, name=self.name,

@ -12,17 +12,19 @@ def parse_and_execute_json(
json_string: str, json_string: str,
parse_md: bool = False, parse_md: bool = False,
verbose: bool = False, verbose: bool = False,
return_str: bool = True, max_retries: int = 3,
) -> dict: ) -> str:
""" """
Parses and executes a JSON string containing function names and parameters. Parses and executes a JSON string containing function names and parameters.
Args: Args:
functions (List[callable]): A list of callable functions. functions (List[Callable[..., Any]]): A list of callable functions.
json_string (str): The JSON string to parse and execute. json_string (str): The JSON string to parse and execute.
parse_md (bool): Flag indicating whether to extract code from Markdown. parse_md (bool): Flag indicating whether to extract code from Markdown.
verbose (bool): Flag indicating whether to enable verbose logging. verbose (bool): Flag indicating whether to enable verbose logging.
return_str (bool): Flag indicating whether to return a JSON string. return_str (bool): Flag indicating whether to return a JSON string.
max_retries (int): Maximum number of retries for executing functions.
Returns: Returns:
dict: A dictionary containing the results of executing the functions with the parsed parameters. dict: A dictionary containing the results of executing the functions with the parsed parameters.
""" """
@ -30,10 +32,20 @@ def parse_and_execute_json(
raise ValueError("Functions and JSON string are required") raise ValueError("Functions and JSON string are required")
if parse_md: if parse_md:
json_string = extract_code_from_markdown(json_string) try:
json_string = extract_code_from_markdown(json_string)
except Exception as e:
logger.error(f"Error extracting code from Markdown: {e}")
return {"error": f"Markdown parsing failed: {str(e)}"}
try: try:
# Create function name to function mapping # Ensure JSON string is stripped of extraneous whitespace
json_string = json_string.strip()
if not json_string:
raise ValueError(
"JSON string is empty after stripping whitespace"
)
function_dict = {func.__name__: func for func in functions} function_dict = {func.__name__: func for func in functions}
if verbose: if verbose:
@ -42,83 +54,80 @@ def parse_and_execute_json(
) )
logger.info(f"Processing JSON: {json_string}") logger.info(f"Processing JSON: {json_string}")
# Parse JSON data try:
data = json.loads(json_string) data = json.loads(json_string)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON format: {e}")
return {"error": f"Invalid JSON format: {str(e)}"}
# Handle both single function and function list formats
function_list = [] function_list = []
if "functions" in data: if "functions" in data:
function_list = data["functions"] function_list = data["functions"]
elif "function" in data: elif "function" in data:
function_list = [data["function"]] function_list = [data["function"]]
else: else:
function_list = [ function_list = [data]
data
] # Assume entire object is single function
# Ensure function_list is a list and filter None values
if isinstance(function_list, dict): if isinstance(function_list, dict):
function_list = [function_list] function_list = [function_list]
function_list = [f for f in function_list if f] function_list = [f for f in function_list if f]
if verbose: if verbose:
logger.info(f"Processing {len(function_list)} functions") logger.info(f"Processing {len(function_list)} functions")
results = {} results = {}
for function_data in function_list: for function_data in function_list:
function_name = function_data.get("name") function_name = function_data.get("name")
parameters = function_data.get("parameters", {}) parameters = function_data.get("parameters", {})
if not function_name: if not function_name:
logger.warning("Function data missing name field") logger.warning("Function data missing 'name' field")
continue continue
if verbose: if verbose:
logger.info( logger.info(
f"Executing {function_name} with params: {parameters}" f"Executing {function_name} with parameters: {parameters}"
) )
if function_name not in function_dict: if function_name not in function_dict:
logger.warning(f"Function {function_name} not found") logger.warning(
results[function_name] = None f"Function '{function_name}' not found"
)
results[function_name] = "Error: Function not found"
continue continue
try: for attempt in range(max_retries):
result = function_dict[function_name](**parameters) try:
results[function_name] = str(result) result = function_dict[function_name](
if verbose: **parameters
logger.info(
f"Result for {function_name}: {result}"
) )
except Exception as e: results[function_name] = str(result)
logger.error( if verbose:
f"Error executing {function_name}: {str(e)}" logger.info(
) f"Result for {function_name}: {result}"
results[function_name] = f"Error: {str(e)}" )
break
except Exception as e:
logger.error(
f"Attempt {attempt + 1} failed for {function_name}: {e}"
)
if attempt == max_retries - 1:
results[function_name] = (
f"Error after {max_retries} attempts: {str(e)}"
)
# Format final results data = {
if len(results) == 1: "results": results,
# Return single result directly "summary": "\n".join(
data = {"result": next(iter(results.values()))} f"{k}: {v}" for k, v in results.items()
else: ),
# Return all results }
data = {
"results": results, return json.dumps(data, indent=4)
"summary": "\n".join(
f"{k}: {v}" for k, v in results.items()
),
}
if return_str:
return json.dumps(data)
else:
return data
except json.JSONDecodeError as e:
error = f"Invalid JSON format: {str(e)}"
logger.error(error)
return {"error": error}
except Exception as e: except Exception as e:
error = f"Error parsing and executing JSON: {str(e)}" error = f"Unexpected error during execution: {str(e)}"
logger.error(error) logger.error(error)
return {"error": error} return {"error": error}

@ -0,0 +1,30 @@
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms_tools import (
coin_gecko_coin_api,
fetch_htx_data,
)
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4o",
dynamic_temperature_enabled=True,
user_name="swarms_corp",
return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
max_tokens=4000, # max output tokens
saved_state_path="agent_00.json",
interactive=False,
)
agent.run(
f"Analyze the $swarms token on HTX with data: {fetch_htx_data('swarms')}. Additionally, consider the following CoinGecko data: {coin_gecko_coin_api('swarms')}"
)

@ -3,11 +3,11 @@ from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT, FINANCIAL_AGENT_SYS_PROMPT,
) )
from swarms_tools import ( from swarms_tools import (
fetch_stock_news,
coin_gecko_coin_api, coin_gecko_coin_api,
fetch_htx_data, fetch_htx_data,
) )
# Initialize the agent # Initialize the agent
agent = Agent( agent = Agent(
agent_name="Financial-Analysis-Agent", agent_name="Financial-Analysis-Agent",
@ -17,14 +17,15 @@ agent = Agent(
model_name="gpt-4o", model_name="gpt-4o",
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
user_name="swarms_corp", user_name="swarms_corp",
retry_attempts=3,
context_length=8192,
return_step_meta=False, return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" "yaml" and output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
max_tokens=4000, # max output tokens max_tokens=4000, # max output tokens
saved_state_path="agent_00.json", saved_state_path="agent_00.json",
interactive=False, interactive=False,
tools=[fetch_stock_news, coin_gecko_coin_api, fetch_htx_data],
) )
agent.run( agent.run("Analyze the $swarms token on htx")
f"Analyze the $swarms token on HTX with data: {fetch_htx_data('swarms')}. Additionally, consider the following CoinGecko data: {coin_gecko_coin_api('swarms')}"
)

@ -0,0 +1,92 @@
# Define a simple testing framework
from swarms.tools.tool_parse_exec import parse_and_execute_json
def run_test(test_name, test_func):
print(f"Running test: {test_name}")
print("------------------------------------------------")
try:
test_func()
print(f"{test_name} passed")
print("------------------------------------------------")
except Exception as e:
print(f"{test_name} failed: {str(e)}")
print("------------------------------------------------")
# Mock functions for testing
def mock_function_a(param1, param2):
return param1 + param2
def mock_function_b(param1):
if param1 < 0:
raise ValueError("Negative value not allowed")
return param1 * 2
# Test cases
def test_parse_and_execute_json_success():
functions = [mock_function_a, mock_function_b]
json_string = '{"functions": [{"name": "mock_function_a", "parameters": {"param1": 1, "param2": 2}}, {"name": "mock_function_b", "parameters": {"param1": 3}}]}'
result = parse_and_execute_json(functions, json_string)
expected_result = {
"results": {"mock_function_a": "3", "mock_function_b": "6"},
"summary": "mock_function_a: 3\nmock_function_b: 6",
}
assert (
result == expected_result
), f"Expected {expected_result}, but got {result}"
def test_parse_and_execute_json_function_not_found():
functions = [mock_function_a]
json_string = '{"functions": [{"name": "non_existent_function", "parameters": {}}]}'
result = parse_and_execute_json(functions, json_string)
expected_result = {
"results": {
"non_existent_function": "Error: Function non_existent_function not found"
},
"summary": "non_existent_function: Error: Function non_existent_function not found",
}
assert (
result == expected_result
), f"Expected {expected_result}, but got {result}"
def test_parse_and_execute_json_error_handling():
functions = [mock_function_b]
json_string = '{"functions": [{"name": "mock_function_b", "parameters": {"param1": -1}}]}'
result = parse_and_execute_json(functions, json_string)
expected_result = {
"results": {
"mock_function_b": "Error: Negative value not allowed"
},
"summary": "mock_function_b: Error: Negative value not allowed",
}
assert (
result == expected_result
), f"Expected {expected_result}, but got {result}"
# Run tests
run_test(
"Test parse_and_execute_json success",
test_parse_and_execute_json_success,
)
print("------------------------------------------------")
run_test(
"Test parse_and_execute_json function not found",
test_parse_and_execute_json_function_not_found,
)
print("------------------------------------------------")
run_test(
"Test parse_and_execute_json error handling",
test_parse_and_execute_json_error_handling,
)
Loading…
Cancel
Save