[DOCS][CSS]

pull/652/head
Your Name 1 month ago
parent e6d616b2c3
commit c2cd677575

@ -1,5 +1,4 @@
/* * Further customization as needed */ */
/* Further customization as needed */
.md-typeset__table { .md-typeset__table {
@ -10,6 +9,15 @@
display: table; display: table;
} }
/* Dark mode */
[data-md-color-scheme="slate"] {
--md-default-bg-color: black;
}
.header__ellipsis {
color: black;
}
/* /*
:root { :root {
--md-primary-fg-color: #EE0F0F; --md-primary-fg-color: #EE0F0F;

@ -0,0 +1,308 @@
import os
from swarms import Agent
from swarm_models import OpenAIChat
from web3 import Web3
from typing import Dict, Optional, Any
from datetime import datetime
import asyncio
from loguru import logger
from dotenv import load_dotenv
import csv
import requests
import time
BLOCKCHAIN_AGENT_PROMPT = """
You are an expert blockchain and cryptocurrency analyst with deep knowledge of Ethereum markets and DeFi ecosystems.
You have access to real-time ETH price data and transaction information.
For each transaction, analyze:
1. MARKET CONTEXT
- Current ETH price and what this transaction means in USD terms
- How this movement compares to typical market volumes
- Whether this could impact ETH price
2. BEHAVIORAL ANALYSIS
- Whether this appears to be institutional, whale, or protocol movement
- If this fits any known wallet patterns or behaviors
- Signs of smart contract interaction or DeFi activity
3. RISK & IMPLICATIONS
- Potential market impact or price influence
- Signs of potential market manipulation or unusual activity
- Protocol or DeFi risks if applicable
4. STRATEGIC INSIGHTS
- What traders should know about this movement
- Potential chain reactions or follow-up effects
- Market opportunities or risks created
Write naturally but precisely. Focus on actionable insights and important patterns.
Your analysis helps traders and researchers understand significant market movements in real-time."""
class EthereumAnalyzer:
def __init__(self, min_value_eth: float = 100.0):
load_dotenv()
logger.add(
"eth_analysis.log",
rotation="500 MB",
retention="10 days",
level="INFO",
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
)
self.w3 = Web3(
Web3.HTTPProvider(
"https://mainnet.infura.io/v3/9aa3d95b3bc440fa88ea12eaa4456161"
)
)
if not self.w3.is_connected():
raise ConnectionError(
"Failed to connect to Ethereum network"
)
self.min_value_eth = min_value_eth
self.last_processed_block = self.w3.eth.block_number
self.eth_price = self.get_eth_price()
self.last_price_update = time.time()
# Initialize AI agent
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError(
"OpenAI API key not found in environment variables"
)
model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4",
temperature=0.1,
)
self.agent = Agent(
agent_name="Ethereum-Analysis-Agent",
system_prompt=BLOCKCHAIN_AGENT_PROMPT,
llm=model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="eth_agent.json",
user_name="eth_analyzer",
retry_attempts=1,
context_length=200000,
output_type="string",
streaming_on=False,
)
self.csv_filename = "ethereum_analysis.csv"
self.initialize_csv()
def get_eth_price(self) -> float:
"""Get current ETH price from CoinGecko API."""
try:
response = requests.get(
"https://api.coingecko.com/api/v3/simple/price",
params={"ids": "ethereum", "vs_currencies": "usd"},
)
return float(response.json()["ethereum"]["usd"])
except Exception as e:
logger.error(f"Error fetching ETH price: {str(e)}")
return 0.0
def update_eth_price(self):
"""Update ETH price if more than 5 minutes have passed."""
if time.time() - self.last_price_update > 300: # 5 minutes
self.eth_price = self.get_eth_price()
self.last_price_update = time.time()
logger.info(f"Updated ETH price: ${self.eth_price:,.2f}")
def initialize_csv(self):
"""Initialize CSV file with headers."""
headers = [
"timestamp",
"transaction_hash",
"from_address",
"to_address",
"value_eth",
"value_usd",
"eth_price",
"gas_used",
"gas_price_gwei",
"block_number",
"analysis",
]
if not os.path.exists(self.csv_filename):
with open(self.csv_filename, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(headers)
async def analyze_transaction(
self, tx_hash: str
) -> Optional[Dict[str, Any]]:
"""Analyze a single transaction."""
try:
tx = self.w3.eth.get_transaction(tx_hash)
receipt = self.w3.eth.get_transaction_receipt(tx_hash)
value_eth = float(self.w3.from_wei(tx.value, "ether"))
if value_eth < self.min_value_eth:
return None
block = self.w3.eth.get_block(tx.blockNumber)
# Update ETH price if needed
self.update_eth_price()
value_usd = value_eth * self.eth_price
analysis = {
"timestamp": datetime.fromtimestamp(
block.timestamp
).isoformat(),
"transaction_hash": tx_hash.hex(),
"from_address": tx["from"],
"to_address": tx.to if tx.to else "Contract Creation",
"value_eth": value_eth,
"value_usd": value_usd,
"eth_price": self.eth_price,
"gas_used": receipt.gasUsed,
"gas_price_gwei": float(
self.w3.from_wei(tx.gasPrice, "gwei")
),
"block_number": tx.blockNumber,
}
# Check if it's a contract
if tx.to:
code = self.w3.eth.get_code(tx.to)
analysis["is_contract"] = len(code) > 0
# Get contract events
if analysis["is_contract"]:
analysis["events"] = receipt.logs
return analysis
except Exception as e:
logger.error(
f"Error analyzing transaction {tx_hash}: {str(e)}"
)
return None
def prepare_analysis_prompt(self, tx_data: Dict[str, Any]) -> str:
"""Prepare detailed analysis prompt including price context."""
value_usd = tx_data["value_usd"]
eth_price = tx_data["eth_price"]
prompt = f"""Analyze this Ethereum transaction in current market context:
Transaction Details:
- Value: {tx_data['value_eth']:.2f} ETH (${value_usd:,.2f} at current price)
- Current ETH Price: ${eth_price:,.2f}
- From: {tx_data['from_address']}
- To: {tx_data['to_address']}
- Contract Interaction: {tx_data.get('is_contract', False)}
- Gas Used: {tx_data['gas_used']:,} units
- Gas Price: {tx_data['gas_price_gwei']:.2f} Gwei
- Block: {tx_data['block_number']}
- Timestamp: {tx_data['timestamp']}
{f"Event Count: {len(tx_data['events'])} events" if tx_data.get('events') else "No contract events"}
Consider the transaction's significance given the current ETH price of ${eth_price:,.2f} and total USD value of ${value_usd:,.2f}.
Analyze market impact, patterns, risks, and strategic implications."""
return prompt
def save_to_csv(self, tx_data: Dict[str, Any], ai_analysis: str):
"""Save transaction data and analysis to CSV."""
row = [
tx_data["timestamp"],
tx_data["transaction_hash"],
tx_data["from_address"],
tx_data["to_address"],
tx_data["value_eth"],
tx_data["value_usd"],
tx_data["eth_price"],
tx_data["gas_used"],
tx_data["gas_price_gwei"],
tx_data["block_number"],
ai_analysis.replace("\n", " "),
]
with open(self.csv_filename, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow(row)
async def monitor_transactions(self):
"""Monitor and analyze transactions one at a time."""
logger.info(
f"Starting transaction monitor (minimum value: {self.min_value_eth} ETH)"
)
while True:
try:
current_block = self.w3.eth.block_number
block = self.w3.eth.get_block(
current_block, full_transactions=True
)
for tx in block.transactions:
tx_analysis = await self.analyze_transaction(
tx.hash
)
if tx_analysis:
# Get AI analysis
analysis_prompt = (
self.prepare_analysis_prompt(tx_analysis)
)
ai_analysis = self.agent.run(analysis_prompt)
print(ai_analysis)
# Save to CSV
self.save_to_csv(tx_analysis, ai_analysis)
# Print analysis
print("\n" + "=" * 50)
print("New Transaction Analysis")
print(
f"Hash: {tx_analysis['transaction_hash']}"
)
print(
f"Value: {tx_analysis['value_eth']:.2f} ETH (${tx_analysis['value_usd']:,.2f})"
)
print(
f"Current ETH Price: ${self.eth_price:,.2f}"
)
print("=" * 50)
print(ai_analysis)
print("=" * 50 + "\n")
await asyncio.sleep(1) # Wait for next block
except Exception as e:
logger.error(f"Error in monitoring loop: {str(e)}")
await asyncio.sleep(1)
async def main():
"""Entry point for the analysis system."""
analyzer = EthereumAnalyzer(min_value_eth=100.0)
await analyzer.monitor_transactions()
if __name__ == "__main__":
print("Starting Ethereum Transaction Analyzer...")
print("Saving results to ethereum_analysis.csv")
print("Press Ctrl+C to stop")
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopping analyzer...")

@ -0,0 +1,4 @@
timestamp,transaction_hash,from_address,to_address,value_eth,gas_used,gas_price_gwei,block_number,analysis
2024-11-27T13:50:35,ddbb665bc75fe848e7ce3d3ce1729243e92466c38ca407deccce8bf629987652,0x267be1C1D684F78cb4F6a176C4911b741E4Ffdc0,0xa40dFEE99E1C85DC97Fdc594b16A460717838703,3200.0,21000,19.968163737,21281878,"Transaction Analysis: This transaction represents a significant transfer of value in the Ethereum network with 3200 ETH (~$6.72 million USD at the current rate) moved from one address to another. It is essential to note that this transaction did not involve smart contract interaction, suggesting it could be a straightforward transfer of funds rather than part of a more complex operation. Looking at the broader market context, large transactions like this can potentially indicate major investment activities or redistribution of assets, which can have ripple effects in the market. If this transaction is part of a larger pattern of significant transfers, it could suggest substantial liquidity moving in the Ethereum ecosystem, possibly affecting the ETH prices. From a DeFi point of view, since there's no contract interaction, it's difficult to infer any direct implications. However, given the substantial value involved, it could be a step in preparation for involvement in DeFi protocols or a move from one DeFi platform to another by a large investor. The transaction fee paid, calculated from the given Gas Used and Gas Price, appears to be within reasonable range. This suggests that the transaction was not rushed and that the sender was willing to wait for this transaction to be confirmed, which might hint towards the non-urgent nature of the transaction. As for potential risk factors or security concerns, the transaction itself appears to be standard and doesn't raise any immediate red flags. However, the parties involved should always be cautious about the address security, maintaining privacy, and avoiding social engineering attacks. For traders and investors, this transaction can be interpreted as a potential bullish sign if it signifies increased liquidity and investment in the Ethereum market, especially if it's followed by similar large transfers. However, due to the anonymous nature of the transaction, it's critical to combine this with other market indicators and not to rely solely on transaction analysis for investment decisions."
2024-11-27T13:52:23,b98bcbf6d57a158b67a126d8f023766e03fb15c3e74becc1189d4244fda61a13,0xEae7380dD4CeF6fbD1144F49E4D1e6964258A4F4,0x28C6c06298d514Db089934071355E5743bf21d60,401.99463589018103,21000,14.978063737,21281887,"Ethereum-Analysis-Agent: Transaction Analysis: This transaction marks a significant transfer of 401.99 ETH, approximately $845,000 at the current rate. The transaction did not involve any smart contract interaction, suggesting a simple fund transfer rather than a complicated operation or interaction with a DeFi protocol. From a broader market perspective, this transaction is meaningful but not as potentially impactful as larger transactions. It can nonetheless be part of a larger pattern of asset movement within the Ethereum ecosystem. If this transaction is part of larger investment activities, it could suggest an increase in demand for ETH and potentially impact its price. Without contract interaction, it's challenging to assess direct implications for DeFi protocols. However, the substantial ETH transfer could suggest a step towards participation in DeFi activities, or a movement of funds between different DeFi platforms. The transaction fee appears reasonable, given the Gas Used and Gas Price. This implies that the transaction wasn't urgent, and the sender was willing to wait for the transaction to be confirmed, indicating a non-critical movement of funds. In terms of security and risk factors, there are no immediate concerns from the transaction itself. Nevertheless, as with any crypto transaction, the parties involved should ensure secure storage of their keys, maintain privacy, and be wary of potential phishing or social engineering attacks. For traders and investors, this transaction could be seen as a bullish sign if it forms part of a trend of increased investment activities in the Ethereum market. However, it's important to remember that transaction analysis should be combined with other market indicators due to the anonymous nature of blockchain transactions."
2024-11-27T13:59:47,a985b74fd3dfee09cbe4a2e6890509e583a3f0ce13f68c98e82996e0f66428be,0xf7858Da8a6617f7C6d0fF2bcAFDb6D2eeDF64840,0xA294cCa691e4C83B1fc0c8d63D9a3eeF0A196DE1,136.0668,494665.408728,3635.46,21000,18.866443971,21281923,"1. MARKET CONTEXT The transaction of 136.07 ETH, equivalent to $494,665.41, is a significant movement in the Ethereum market. However, compared to the daily trading volume of Ethereum, which often exceeds billions of dollars, this transaction is not large enough to significantly impact the ETH price on its own. 2. BEHAVIORAL ANALYSIS The transaction does not appear to be a protocol movement as there is no contract interaction involved. It could be a whale movement, given the substantial amount of ETH transferred. However, without additional information about the wallets involved, it's difficult to definitively determine the nature of the transaction. The gas price of 18.87 Gwei is relatively standard, suggesting that the transaction was not urgent or time-sensitive. 3. RISK & IMPLICATIONS The transaction does not show signs of market manipulation or unusual activity. The absence of contract interaction suggests that this transaction does not directly involve DeFi protocols, reducing the risk of smart contract vulnerabilities or DeFi-related risks. However, the large amount of ETH transferred could potentially influence market sentiment if it is part of a larger trend of similar transactions. 4. STRATEGIC INSIGHTS Traders should note this transaction as part of the broader market activity. While a single transaction of this size is unlikely to significantly impact the market, a series of similar transactions could indicate a larger trend. If this is part of a larger movement of ETH out of exchanges, it could suggest a decrease in selling pressure, which could be bullish for ETH. Conversely, if this is part of a larger movement into exchanges, it could indicate an increase in selling pressure, which could be bearish for ETH. Traders should monitor the market for further similar transactions to gain a better understanding of the potential market trends."
Can't render this file because it has a wrong number of fields in line 4.

@ -1,5 +1,6 @@
import os
from typing import List, Dict, Any, Optional, Callable from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass from dataclasses import dataclass, field
import json import json
from datetime import datetime from datetime import datetime
import inspect import inspect
@ -7,7 +8,6 @@ import typing
from typing import Union from typing import Union
from swarms import Agent from swarms import Agent
from swarm_models import OpenAIChat from swarm_models import OpenAIChat
from dotenv import load_dotenv
@dataclass @dataclass
@ -19,17 +19,6 @@ class ToolDefinition:
callable: Optional[Callable] = None callable: Optional[Callable] = None
@dataclass
class ExecutionStep:
step_id: str
tool_name: str
parameters: Dict[str, Any]
purpose: str
depends_on: List[str]
completed: bool = False
result: Optional[Any] = None
def extract_type_hints(func: Callable) -> Dict[str, Any]: def extract_type_hints(func: Callable) -> Dict[str, Any]:
"""Extract parameter types from function type hints.""" """Extract parameter types from function type hints."""
return typing.get_type_hints(func) return typing.get_type_hints(func)
@ -86,267 +75,343 @@ def extract_tool_info(func: Callable) -> ToolDefinition:
) )
class ToolUsingAgent: @dataclass
class FunctionSpec:
"""Specification for a callable tool function."""
name: str
description: str
parameters: Dict[
str, dict
] # Contains type and description for each parameter
return_type: str
return_description: str
@dataclass
class ExecutionStep:
"""Represents a single step in the execution plan."""
step_id: int
function_name: str
parameters: Dict[str, Any]
expected_output: str
completed: bool = False
result: Any = None
@dataclass
class ExecutionContext:
"""Maintains state during execution."""
task: str
steps: List[ExecutionStep] = field(default_factory=list)
results: Dict[int, Any] = field(default_factory=dict)
current_step: int = 0
history: List[Dict[str, Any]] = field(default_factory=list)
class ToolAgent:
def __init__( def __init__(
self, self,
tools: List[Callable], functions: List[Callable],
openai_api_key: str, openai_api_key: str,
model_name: str = "gpt-4", model_name: str = "gpt-4",
temperature: float = 0.1, temperature: float = 0.1,
max_loops: int = 10,
): ):
# Convert callable tools to ToolDefinitions self.functions = {func.__name__: func for func in functions}
self.available_tools = { self.function_specs = self._analyze_functions(functions)
tool.__name__: extract_tool_info(tool) for tool in tools
}
self.execution_plan: List[ExecutionStep] = []
self.current_step_index = 0
self.max_loops = max_loops
# Initialize the OpenAI model
self.model = OpenAIChat( self.model = OpenAIChat(
openai_api_key=openai_api_key, openai_api_key=openai_api_key,
model_name=model_name, model_name=model_name,
temperature=temperature, temperature=temperature,
) )
# Create system prompt with tool descriptions
self.system_prompt = self._create_system_prompt() self.system_prompt = self._create_system_prompt()
self.agent = Agent( self.agent = Agent(
agent_name="Tool-Using-Agent", agent_name="Tool-Agent",
system_prompt=self.system_prompt, system_prompt=self.system_prompt,
llm=self.model, llm=self.model,
max_loops=1, max_loops=1,
autosave=True,
verbose=True, verbose=True,
saved_state_path="tool_agent_state.json",
context_length=200000,
) )
def _analyze_functions(
self, functions: List[Callable]
) -> Dict[str, FunctionSpec]:
"""Analyze functions to create detailed specifications."""
specs = {}
for func in functions:
hints = get_type_hints(func)
sig = inspect.signature(func)
doc = inspect.getdoc(func) or ""
# Parse docstring for parameter descriptions
param_descriptions = {}
current_param = None
for line in doc.split("\n"):
if ":param" in line:
param_name = (
line.split(":param")[1].split(":")[0].strip()
)
desc = line.split(":", 2)[-1].strip()
param_descriptions[param_name] = desc
elif ":return:" in line:
return_desc = line.split(":return:")[1].strip()
# Build parameter specifications
parameters = {}
for name, param in sig.parameters.items():
param_type = hints.get(name, Any)
parameters[name] = {
"type": str(param_type),
"type_class": param_type,
"description": param_descriptions.get(name, ""),
"required": param.default == param.empty,
}
specs[func.__name__] = FunctionSpec(
name=func.__name__,
description=doc.split("\n")[0],
parameters=parameters,
return_type=str(hints.get("return", Any)),
return_description=(
return_desc if "return_desc" in locals() else ""
),
)
return specs
def _create_system_prompt(self) -> str: def _create_system_prompt(self) -> str:
"""Create system prompt with available tools information.""" """Create system prompt with detailed function specifications."""
tools_description = [] functions_desc = []
for tool_name, tool in self.available_tools.items(): for spec in self.function_specs.values():
tools_description.append( params_desc = []
for name, details in spec.parameters.items():
params_desc.append(
f" - {name}: {details['type']} - {details['description']}"
)
functions_desc.append(
f""" f"""
Tool: {tool_name} Function: {spec.name}
Description: {tool.description} Description: {spec.description}
Parameters: {json.dumps(tool.parameters, indent=2)} Parameters:
Required Parameters: {tool.required_params} {chr(10).join(params_desc)}
Returns: {spec.return_type} - {spec.return_description}
""" """
) )
output = f"""You are an autonomous agent capable of executing complex tasks using available tools. return f"""You are an AI agent that creates and executes plans using available functions.
Available Tools: Available Functions:
{chr(10).join(tools_description)} {chr(10).join(functions_desc)}
Follow these protocols: You must respond in two formats depending on the phase:
1. Create a detailed plan using available tools
2. Execute each step in order
3. Handle errors appropriately
4. Maintain execution state
5. Return results in structured format
You must ALWAYS respond in the following JSON format: 1. Planning Phase:
{{ {{
"phase": "planning",
"plan": {{ "plan": {{
"description": "Brief description of the overall plan", "description": "Overall plan description",
"steps": [ "steps": [
{{ {{
"step_number": 1, "step_id": 1,
"tool_name": "name_of_tool", "function": "function_name",
"description": "What this step accomplishes",
"parameters": {{ "parameters": {{
"param1": "value1", "param1": "value1",
"param2": "value2" "param2": "value2"
}}, }},
"expected_output": "Description of expected output" "purpose": "Why this step is needed"
}} }}
] ]
}}, }}
"reasoning": "Explanation of why this plan was chosen" }}
2. Execution Phase:
{{
"phase": "execution",
"analysis": "Analysis of current result",
"next_action": {{
"type": "continue|request_input|complete",
"reason": "Why this action was chosen",
"needed_input": {{}} # If requesting input
}}
}} }}
Before executing any tool: Always:
1. Validate all required parameters are present - Use exact function names
2. Verify parameter types match specifications - Ensure parameter types match specifications
3. Check parameter values are within valid ranges/formats - Provide clear reasoning for each decision
4. Ensure logical dependencies between steps are met
If any validation fails:
1. Return error in JSON format with specific details
2. Suggest corrections if possible
3. Do not proceed with execution
After each step execution:
1. Verify output matches expected format
2. Log results and any warnings/errors
3. Update execution state
4. Determine if plan adjustment needed
Error Handling:
1. Catch and classify all errors
2. Provide detailed error messages
3. Suggest recovery steps
4. Maintain system stability
The final output must be valid JSON that can be parsed. Always check your response can be parsed as JSON before returning.
""" """
return output
def execute_tool( def _execute_function(
self, tool_name: str, parameters: Dict[str, Any] self, spec: FunctionSpec, parameters: Dict[str, Any]
) -> Any: ) -> Any:
"""Execute a tool with given parameters.""" """Execute a function with type checking."""
tool = self.available_tools[tool_name] converted_params = {}
if not tool.callable: for name, value in parameters.items():
param_spec = spec.parameters[name]
try:
# Convert value to required type
param_type = param_spec["type_class"]
if param_type in (int, float, str, bool):
converted_params[name] = param_type(value)
else:
converted_params[name] = value
except (ValueError, TypeError) as e:
raise ValueError( raise ValueError(
f"Tool {tool_name} has no associated callable" f"Parameter '{name}' conversion failed: {str(e)}"
) )
# Convert parameters to appropriate types return self.functions[spec.name](**converted_params)
converted_params = {}
for param_name, param_value in parameters.items():
param_info = tool.parameters[param_name]
param_type = eval(
param_info["type"]
) # Note: Be careful with eval
converted_params[param_name] = param_type(param_value)
return tool.callable(**converted_params)
def run(self, task: str) -> Dict[str, Any]: def run(self, task: str) -> Dict[str, Any]:
"""Execute the complete task with proper logging and error handling.""" """Execute task with planning and step-by-step execution."""
context = ExecutionContext(task=task)
execution_log = { execution_log = {
"task": task, "task": task,
"start_time": datetime.utcnow().isoformat(), "start_time": datetime.utcnow().isoformat(),
"steps": [], "steps": [],
"final_result": None "final_result": None,
} }
try: try:
# Create and execute plan # Planning phase
plan_response = self.agent.run(f"Create a plan for: {task}") plan_prompt = f"Create a plan to: {task}"
plan_data = json.loads(plan_response) plan_response = self.agent.run(plan_prompt)
plan_data = json.loads(
plan_response.replace("System:", "").strip()
)
# Convert plan to execution steps
for step in plan_data["plan"]["steps"]:
context.steps.append(
ExecutionStep(
step_id=step["step_id"],
function_name=step["function"],
parameters=step["parameters"],
expected_output=step["purpose"],
)
)
# Extract steps from the correct path in JSON # Execution phase
steps = plan_data["plan"]["steps"] # Changed from plan_data["steps"] while context.current_step < len(context.steps):
step = context.steps[context.current_step]
print(
f"\nExecuting step {step.step_id}: {step.function_name}"
)
for step in steps:
try: try:
# Check if parameters need default values # Execute function
for param_name, param_value in step["parameters"].items(): spec = self.function_specs[step.function_name]
if isinstance(param_value, str) and not param_value.replace(".", "").isdigit(): result = self._execute_function(
# If parameter is a description rather than a value, set default spec, step.parameters
if "income" in param_name.lower():
step["parameters"][param_name] = 75000.0
elif "year" in param_name.lower():
step["parameters"][param_name] = 2024
elif "investment" in param_name.lower():
step["parameters"][param_name] = 1000.0
# Execute the tool
result = self.execute_tool(
step["tool_name"],
step["parameters"]
) )
context.results[step.step_id] = result
step.completed = True
step.result = result
# Get agent's analysis
analysis_prompt = f"""
Step {step.step_id} completed:
Function: {step.function_name}
Result: {json.dumps(result)}
Remaining steps: {len(context.steps) - context.current_step - 1}
Analyze the result and decide next action.
"""
execution_log["steps"].append({ analysis_response = self.agent.run(
"step_number": step["step_number"], analysis_prompt
"tool": step["tool_name"], )
"parameters": step["parameters"], analysis_data = json.loads(
"success": True, analysis_response.replace(
"System:", ""
).strip()
)
execution_log["steps"].append(
{
"step_id": step.step_id,
"function": step.function_name,
"parameters": step.parameters,
"result": result, "result": result,
"description": step["description"] "analysis": analysis_data,
}) }
)
if (
analysis_data["next_action"]["type"]
== "complete"
):
if (
context.current_step
< len(context.steps) - 1
):
continue
break
context.current_step += 1
except Exception as e: except Exception as e:
execution_log["steps"].append({ print(f"Error in step {step.step_id}: {str(e)}")
"step_number": step["step_number"], execution_log["steps"].append(
"tool": step["tool_name"], {
"parameters": step["parameters"], "step_id": step.step_id,
"success": False, "function": step.function_name,
"parameters": step.parameters,
"error": str(e), "error": str(e),
"description": step["description"] }
}) )
print(f"Error executing step {step['step_number']}: {str(e)}") raise
# Continue with next step instead of raising
continue # Final analysis
final_prompt = f"""
Task completed. Results:
{json.dumps(context.results, indent=2)}
Provide final analysis and recommendations.
"""
# Only mark as success if at least some steps succeeded final_analysis = self.agent.run(final_prompt)
successful_steps = [s for s in execution_log["steps"] if s["success"]]
if successful_steps:
execution_log["final_result"] = { execution_log["final_result"] = {
"success": True, "success": True,
"results": successful_steps, "results": context.results,
"reasoning": plan_data.get("reasoning", "No reasoning provided") "analysis": json.loads(
} final_analysis.replace("System:", "").strip()
else: ),
execution_log["final_result"] = {
"success": False,
"error": "No steps completed successfully",
"plan": plan_data
} }
except Exception as e: except Exception as e:
execution_log["final_result"] = { execution_log["final_result"] = {
"success": False, "success": False,
"error": str(e), "error": str(e),
"plan": plan_data if 'plan_data' in locals() else None
} }
execution_log["end_time"] = datetime.utcnow().isoformat() execution_log["end_time"] = datetime.utcnow().isoformat()
return execution_log return execution_log
# Example usage def calculate_investment_return(
if __name__ == "__main__": principal: float, rate: float, years: int
load_dotenv() ) -> float:
"""Calculate investment return with compound interest.
# Example tool functions :param principal: Initial investment amount in dollars
def research_ira_requirements() -> Dict[str, Any]: :param rate: Annual interest rate as decimal (e.g., 0.07 for 7%)
"""Research and return ROTH IRA eligibility requirements.""" :param years: Number of years to invest
return { :return: Final investment value
"age_requirement": "Must have earned income", """
"income_limits": {"single": 144000, "married": 214000}, return principal * (1 + rate) ** years
}
def calculate_contribution_limit(
income: float, tax_year: int
) -> Dict[str, float]:
"""Calculate maximum ROTH IRA contribution based on income and tax year."""
base_limit = 6000 if tax_year <= 2022 else 6500
if income > 144000:
return {"limit": 0}
return {"limit": base_limit}
def find_brokers(min_investment: float) -> List[Dict[str, Any]]:
"""Find suitable brokers for ROTH IRA based on minimum investment."""
return [
{"name": "Broker A", "min_investment": min_investment},
{
"name": "Broker B",
"min_investment": min_investment * 1.5,
},
]
# Initialize agent with tools agent = ToolAgent(
agent = ToolUsingAgent( functions=[calculate_investment_return],
tools=[ openai_api_key=os.getenv("OPENAI_API_KEY"),
research_ira_requirements,
calculate_contribution_limit,
find_brokers,
],
openai_api_key="",
) )
# Run a task
result = agent.run( result = agent.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break? " "Calculate returns for $10000 invested at 7% for 10 years"
"What are the criteria?"
) )
print(json.dumps(result, indent=2))

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "6.3.6" version = "6.3.7"
description = "Swarms - Pytorch" description = "Swarms - Pytorch"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]

@ -5,8 +5,8 @@ from loguru import logger
load_dotenv() load_dotenv()
# More reliable string comparison # Disable logging by default
if os.getenv("SWARMS_VERBOSE_GLOBAL", "True").lower() == "false": if os.getenv("SWARMS_VERBOSE_GLOBAL", "False").lower() == "false":
logger.disable("") logger.disable("")
# Import telemetry functions with error handling # Import telemetry functions with error handling

@ -12,7 +12,7 @@ def auto_update():
try: try:
# Check if auto-update is disabled # Check if auto-update is disabled
auto_update_enabled = os.getenv( auto_update_enabled = os.getenv(
"SWARMS_AUTOUPDATE_ON", "true" "SWARMS_AUTOUPDATE_ON", "false"
).lower() ).lower()
if auto_update_enabled == "false": if auto_update_enabled == "false":
logger.info( logger.info(

@ -36,7 +36,7 @@ def scrape_tool_func_docs(fn: Callable) -> str:
f" {inspect.getdoc(fn)}\nParameters:\n{parameters_str}" f" {inspect.getdoc(fn)}\nParameters:\n{parameters_str}"
) )
except Exception as error: except Exception as error:
error_msg = ( (
formatter.print_panel( formatter.print_panel(
f"Error scraping tool function docs {error} try" f"Error scraping tool function docs {error} try"
" optimizing your inputs with different" " optimizing your inputs with different"

@ -0,0 +1,292 @@
import torch
import torch.nn as nn
import torch.distributed as dist
from dataclasses import dataclass
from typing import Optional, Tuple, Union
from loguru import logger
import math
@dataclass
class StarAttentionConfig:
"""Configuration for StarAttention module.
Attributes:
hidden_size: Dimension of the model's hidden states
num_attention_heads: Number of attention heads
num_hosts: Number of hosts in the distributed system
block_size: Size of each context block
anchor_size: Size of the anchor block
dropout_prob: Dropout probability (default: 0.1)
layer_norm_eps: Layer normalization epsilon (default: 1e-12)
"""
hidden_size: int
num_attention_heads: int
num_hosts: int
block_size: int
anchor_size: int
dropout_prob: float = 0.1
layer_norm_eps: float = 1e-12
class StarAttention(nn.Module):
"""
Implementation of Star Attention mechanism for distributed inference.
The module implements a two-phase attention mechanism:
1. Local Context Encoding with Anchor Blocks
2. Query Encoding and Output Generation with Global Attention
"""
def __init__(self, config: StarAttentionConfig):
super().__init__()
if config.hidden_size % config.num_attention_heads != 0:
raise ValueError(
f"Hidden size {config.hidden_size} not divisible by number of attention "
f"heads {config.num_attention_heads}"
)
self.config = config
self.head_dim = (
config.hidden_size // config.num_attention_heads
)
# Initialize components
self.query = nn.Linear(config.hidden_size, config.hidden_size)
self.key = nn.Linear(config.hidden_size, config.hidden_size)
self.value = nn.Linear(config.hidden_size, config.hidden_size)
self.dropout = nn.Dropout(config.dropout_prob)
self.layer_norm = nn.LayerNorm(
config.hidden_size, eps=config.layer_norm_eps
)
# KV cache for storing computed key/value pairs
self.kv_cache = {}
logger.info(
f"Initialized StarAttention with config: {config}"
)
def _split_heads(
self, tensor: torch.Tensor, num_heads: int
) -> torch.Tensor:
"""Split the last dimension into (num_heads, head_dim)."""
batch_size, seq_len, _ = tensor.size()
tensor = tensor.view(
batch_size, seq_len, num_heads, self.head_dim
)
# Transpose to (batch_size, num_heads, seq_len, head_dim)
return tensor.transpose(1, 2)
def _merge_heads(self, tensor: torch.Tensor) -> torch.Tensor:
"""Merge the head dimension back into hidden_size."""
batch_size, _, seq_len, _ = tensor.size()
tensor = tensor.transpose(1, 2)
return tensor.reshape(
batch_size, seq_len, self.config.hidden_size
)
def _compute_attention_scores(
self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
mask: Optional[torch.Tensor] = None,
) -> Tuple[torch.Tensor, torch.Tensor]:
"""Compute attention scores and weighted values."""
# Scale dot-product attention
scores = torch.matmul(
query, key.transpose(-2, -1)
) / math.sqrt(self.head_dim)
if mask is not None:
scores = scores.masked_fill(mask == 0, float("-inf"))
# Online softmax computation
attention_probs = torch.nn.functional.softmax(scores, dim=-1)
attention_probs = self.dropout(attention_probs)
context = torch.matmul(attention_probs, value)
return context, attention_probs
def phase1_local_context_encoding(
self,
input_ids: torch.Tensor,
host_id: int,
device: Union[str, torch.device] = "cuda",
) -> None:
"""
Phase 1: Local Context Encoding with Anchor Blocks
Args:
input_ids: Input tensor of shape (batch_size, seq_len)
host_id: ID of the current host
device: Device to run computations on
"""
logger.debug(f"Starting Phase 1 on host {host_id}")
# Calculate block assignments
block_start = host_id * self.config.block_size
block_end = block_start + self.config.block_size
# Get local block
local_block = input_ids[:, block_start:block_end].to(device)
# Get anchor block (first block)
anchor_block = input_ids[:, : self.config.anchor_size].to(
device
)
# Compute KV pairs for local block
local_hidden = self.layer_norm(local_block)
local_key = self._split_heads(
self.key(local_hidden), self.config.num_attention_heads
)
local_value = self._split_heads(
self.value(local_hidden), self.config.num_attention_heads
)
# Store in KV cache
self.kv_cache[host_id] = {
"key": local_key,
"value": local_value,
"anchor_key": (
None
if host_id == 0
else self._split_heads(
self.key(self.layer_norm(anchor_block)),
self.config.num_attention_heads,
)
),
}
logger.debug(
f"Phase 1 complete on host {host_id}. KV cache shapes - "
f"key: {local_key.shape}, value: {local_value.shape}"
)
def phase2_query_encoding(
self,
query_input: torch.Tensor,
host_id: int,
is_query_host: bool,
device: Union[str, torch.device] = "cuda",
) -> Optional[torch.Tensor]:
"""
Phase 2: Query Encoding and Output Generation
Args:
query_input: Query tensor of shape (batch_size, seq_len, hidden_size)
host_id: ID of the current host
is_query_host: Whether this host is the query host
device: Device to run computations on
Returns:
Output tensor if this is the query host, None otherwise
"""
logger.debug(f"Starting Phase 2 on host {host_id}")
# Transform query
query_hidden = self.layer_norm(query_input)
query = self._split_heads(
self.query(query_hidden), self.config.num_attention_heads
)
# Compute local attention scores
local_context, local_probs = self._compute_attention_scores(
query,
self.kv_cache[host_id]["key"],
self.kv_cache[host_id]["value"],
)
if not is_query_host:
# Non-query hosts send their local attention statistics
dist.send(local_probs, dst=self.config.num_hosts - 1)
return None
# Query host aggregates attention from all hosts
all_attention_probs = [local_probs]
for src_rank in range(self.config.num_hosts - 1):
probs = torch.empty_like(local_probs)
dist.recv(probs, src=src_rank)
all_attention_probs.append(probs)
# Compute global attention
torch.mean(torch.stack(all_attention_probs), dim=0)
# Final output computation
output = self._merge_heads(local_context)
output = self.dropout(output)
logger.debug(
f"Phase 2 complete on host {host_id}. Output shape: {output.shape}"
)
return output
def forward(
self,
input_ids: torch.Tensor,
query_input: torch.Tensor,
host_id: int,
is_query_host: bool,
device: Union[str, torch.device] = "cuda",
) -> Optional[torch.Tensor]:
"""
Forward pass of the StarAttention module.
Args:
input_ids: Input tensor of shape (batch_size, seq_len)
query_input: Query tensor of shape (batch_size, seq_len, hidden_size)
host_id: ID of the current host
is_query_host: Whether this host is the query host
device: Device to run computations on
Returns:
Output tensor if this is the query host, None otherwise
"""
# Phase 1: Local Context Encoding
self.phase1_local_context_encoding(input_ids, host_id, device)
# Phase 2: Query Encoding and Output Generation
return self.phase2_query_encoding(
query_input, host_id, is_query_host, device
)
# Example forward pass
config = StarAttentionConfig(
hidden_size=768,
num_attention_heads=12,
num_hosts=3,
block_size=512,
anchor_size=128,
)
# Initialize model
model = StarAttention(config)
# Example input tensors
batch_size = 4
seq_len = 512
input_ids = torch.randint(
0, 1000, (batch_size, seq_len)
) # Random input IDs
query_input = torch.randn(
batch_size, seq_len, config.hidden_size
) # Random query input
# Example forward pass for query host (host_id = 2)
output = model(
input_ids=input_ids,
query_input=query_input,
host_id=2,
is_query_host=True,
device="cpu",
)
print(output)
Loading…
Cancel
Save