diff --git a/docs/assets/css/extra.css b/docs/assets/css/extra.css index b639e2f7..d11f77f5 100644 --- a/docs/assets/css/extra.css +++ b/docs/assets/css/extra.css @@ -1,18 +1,26 @@ - -/* Further customization as needed */ +/* * Further customization as needed */ */ .md-typeset__table { - min-width: 100%; -} - -.md-typeset table:not([class]) { - display: table; -} - -/* -:root { - --md-primary-fg-color: #EE0F0F; - --md-primary-fg-color--light: #ECB7B7; - --md-primary-fg-color--dark: #90030C; - } */ \ No newline at end of file + min-width: 100%; + } + + .md-typeset table:not([class]) { + display: table; + } + + /* Dark mode */ + [data-md-color-scheme="slate"] { + --md-default-bg-color: black; + } + + .header__ellipsis { + color: black; + } + + /* + :root { + --md-primary-fg-color: #EE0F0F; + --md-primary-fg-color--light: #ECB7B7; + --md-primary-fg-color--dark: #90030C; + } */ \ No newline at end of file diff --git a/ethchain_agent.py b/ethchain_agent.py new file mode 100644 index 00000000..cc06aeb5 --- /dev/null +++ b/ethchain_agent.py @@ -0,0 +1,308 @@ +import os +from swarms import Agent +from swarm_models import OpenAIChat +from web3 import Web3 +from typing import Dict, Optional, Any +from datetime import datetime +import asyncio +from loguru import logger +from dotenv import load_dotenv +import csv +import requests +import time + +BLOCKCHAIN_AGENT_PROMPT = """ +You are an expert blockchain and cryptocurrency analyst with deep knowledge of Ethereum markets and DeFi ecosystems. +You have access to real-time ETH price data and transaction information. + +For each transaction, analyze: + +1. MARKET CONTEXT +- Current ETH price and what this transaction means in USD terms +- How this movement compares to typical market volumes +- Whether this could impact ETH price + +2. BEHAVIORAL ANALYSIS +- Whether this appears to be institutional, whale, or protocol movement +- If this fits any known wallet patterns or behaviors +- Signs of smart contract interaction or DeFi activity + +3. RISK & IMPLICATIONS +- Potential market impact or price influence +- Signs of potential market manipulation or unusual activity +- Protocol or DeFi risks if applicable + +4. STRATEGIC INSIGHTS +- What traders should know about this movement +- Potential chain reactions or follow-up effects +- Market opportunities or risks created + +Write naturally but precisely. Focus on actionable insights and important patterns. +Your analysis helps traders and researchers understand significant market movements in real-time.""" + + +class EthereumAnalyzer: + def __init__(self, min_value_eth: float = 100.0): + load_dotenv() + + logger.add( + "eth_analysis.log", + rotation="500 MB", + retention="10 days", + level="INFO", + format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}", + ) + + self.w3 = Web3( + Web3.HTTPProvider( + "https://mainnet.infura.io/v3/9aa3d95b3bc440fa88ea12eaa4456161" + ) + ) + if not self.w3.is_connected(): + raise ConnectionError( + "Failed to connect to Ethereum network" + ) + + self.min_value_eth = min_value_eth + self.last_processed_block = self.w3.eth.block_number + self.eth_price = self.get_eth_price() + self.last_price_update = time.time() + + # Initialize AI agent + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + raise ValueError( + "OpenAI API key not found in environment variables" + ) + + model = OpenAIChat( + openai_api_key=api_key, + model_name="gpt-4", + temperature=0.1, + ) + + self.agent = Agent( + agent_name="Ethereum-Analysis-Agent", + system_prompt=BLOCKCHAIN_AGENT_PROMPT, + llm=model, + max_loops=1, + autosave=True, + dashboard=False, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="eth_agent.json", + user_name="eth_analyzer", + retry_attempts=1, + context_length=200000, + output_type="string", + streaming_on=False, + ) + + self.csv_filename = "ethereum_analysis.csv" + self.initialize_csv() + + def get_eth_price(self) -> float: + """Get current ETH price from CoinGecko API.""" + try: + response = requests.get( + "https://api.coingecko.com/api/v3/simple/price", + params={"ids": "ethereum", "vs_currencies": "usd"}, + ) + return float(response.json()["ethereum"]["usd"]) + except Exception as e: + logger.error(f"Error fetching ETH price: {str(e)}") + return 0.0 + + def update_eth_price(self): + """Update ETH price if more than 5 minutes have passed.""" + if time.time() - self.last_price_update > 300: # 5 minutes + self.eth_price = self.get_eth_price() + self.last_price_update = time.time() + logger.info(f"Updated ETH price: ${self.eth_price:,.2f}") + + def initialize_csv(self): + """Initialize CSV file with headers.""" + headers = [ + "timestamp", + "transaction_hash", + "from_address", + "to_address", + "value_eth", + "value_usd", + "eth_price", + "gas_used", + "gas_price_gwei", + "block_number", + "analysis", + ] + + if not os.path.exists(self.csv_filename): + with open(self.csv_filename, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(headers) + + async def analyze_transaction( + self, tx_hash: str + ) -> Optional[Dict[str, Any]]: + """Analyze a single transaction.""" + try: + tx = self.w3.eth.get_transaction(tx_hash) + receipt = self.w3.eth.get_transaction_receipt(tx_hash) + + value_eth = float(self.w3.from_wei(tx.value, "ether")) + + if value_eth < self.min_value_eth: + return None + + block = self.w3.eth.get_block(tx.blockNumber) + + # Update ETH price if needed + self.update_eth_price() + + value_usd = value_eth * self.eth_price + + analysis = { + "timestamp": datetime.fromtimestamp( + block.timestamp + ).isoformat(), + "transaction_hash": tx_hash.hex(), + "from_address": tx["from"], + "to_address": tx.to if tx.to else "Contract Creation", + "value_eth": value_eth, + "value_usd": value_usd, + "eth_price": self.eth_price, + "gas_used": receipt.gasUsed, + "gas_price_gwei": float( + self.w3.from_wei(tx.gasPrice, "gwei") + ), + "block_number": tx.blockNumber, + } + + # Check if it's a contract + if tx.to: + code = self.w3.eth.get_code(tx.to) + analysis["is_contract"] = len(code) > 0 + + # Get contract events + if analysis["is_contract"]: + analysis["events"] = receipt.logs + + return analysis + + except Exception as e: + logger.error( + f"Error analyzing transaction {tx_hash}: {str(e)}" + ) + return None + + def prepare_analysis_prompt(self, tx_data: Dict[str, Any]) -> str: + """Prepare detailed analysis prompt including price context.""" + value_usd = tx_data["value_usd"] + eth_price = tx_data["eth_price"] + + prompt = f"""Analyze this Ethereum transaction in current market context: + +Transaction Details: +- Value: {tx_data['value_eth']:.2f} ETH (${value_usd:,.2f} at current price) +- Current ETH Price: ${eth_price:,.2f} +- From: {tx_data['from_address']} +- To: {tx_data['to_address']} +- Contract Interaction: {tx_data.get('is_contract', False)} +- Gas Used: {tx_data['gas_used']:,} units +- Gas Price: {tx_data['gas_price_gwei']:.2f} Gwei +- Block: {tx_data['block_number']} +- Timestamp: {tx_data['timestamp']} + +{f"Event Count: {len(tx_data['events'])} events" if tx_data.get('events') else "No contract events"} + +Consider the transaction's significance given the current ETH price of ${eth_price:,.2f} and total USD value of ${value_usd:,.2f}. +Analyze market impact, patterns, risks, and strategic implications.""" + + return prompt + + def save_to_csv(self, tx_data: Dict[str, Any], ai_analysis: str): + """Save transaction data and analysis to CSV.""" + row = [ + tx_data["timestamp"], + tx_data["transaction_hash"], + tx_data["from_address"], + tx_data["to_address"], + tx_data["value_eth"], + tx_data["value_usd"], + tx_data["eth_price"], + tx_data["gas_used"], + tx_data["gas_price_gwei"], + tx_data["block_number"], + ai_analysis.replace("\n", " "), + ] + + with open(self.csv_filename, "a", newline="") as f: + writer = csv.writer(f) + writer.writerow(row) + + async def monitor_transactions(self): + """Monitor and analyze transactions one at a time.""" + logger.info( + f"Starting transaction monitor (minimum value: {self.min_value_eth} ETH)" + ) + + while True: + try: + current_block = self.w3.eth.block_number + block = self.w3.eth.get_block( + current_block, full_transactions=True + ) + + for tx in block.transactions: + tx_analysis = await self.analyze_transaction( + tx.hash + ) + + if tx_analysis: + # Get AI analysis + analysis_prompt = ( + self.prepare_analysis_prompt(tx_analysis) + ) + ai_analysis = self.agent.run(analysis_prompt) + print(ai_analysis) + + # Save to CSV + self.save_to_csv(tx_analysis, ai_analysis) + + # Print analysis + print("\n" + "=" * 50) + print("New Transaction Analysis") + print( + f"Hash: {tx_analysis['transaction_hash']}" + ) + print( + f"Value: {tx_analysis['value_eth']:.2f} ETH (${tx_analysis['value_usd']:,.2f})" + ) + print( + f"Current ETH Price: ${self.eth_price:,.2f}" + ) + print("=" * 50) + print(ai_analysis) + print("=" * 50 + "\n") + + await asyncio.sleep(1) # Wait for next block + + except Exception as e: + logger.error(f"Error in monitoring loop: {str(e)}") + await asyncio.sleep(1) + + +async def main(): + """Entry point for the analysis system.""" + analyzer = EthereumAnalyzer(min_value_eth=100.0) + await analyzer.monitor_transactions() + + +if __name__ == "__main__": + print("Starting Ethereum Transaction Analyzer...") + print("Saving results to ethereum_analysis.csv") + print("Press Ctrl+C to stop") + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nStopping analyzer...") diff --git a/ethereum_analysis.csv b/ethereum_analysis.csv new file mode 100644 index 00000000..703c4fbe --- /dev/null +++ b/ethereum_analysis.csv @@ -0,0 +1,4 @@ +timestamp,transaction_hash,from_address,to_address,value_eth,gas_used,gas_price_gwei,block_number,analysis +2024-11-27T13:50:35,ddbb665bc75fe848e7ce3d3ce1729243e92466c38ca407deccce8bf629987652,0x267be1C1D684F78cb4F6a176C4911b741E4Ffdc0,0xa40dFEE99E1C85DC97Fdc594b16A460717838703,3200.0,21000,19.968163737,21281878,"Transaction Analysis: This transaction represents a significant transfer of value in the Ethereum network with 3200 ETH (~$6.72 million USD at the current rate) moved from one address to another. It is essential to note that this transaction did not involve smart contract interaction, suggesting it could be a straightforward transfer of funds rather than part of a more complex operation. Looking at the broader market context, large transactions like this can potentially indicate major investment activities or redistribution of assets, which can have ripple effects in the market. If this transaction is part of a larger pattern of significant transfers, it could suggest substantial liquidity moving in the Ethereum ecosystem, possibly affecting the ETH prices. From a DeFi point of view, since there's no contract interaction, it's difficult to infer any direct implications. However, given the substantial value involved, it could be a step in preparation for involvement in DeFi protocols or a move from one DeFi platform to another by a large investor. The transaction fee paid, calculated from the given Gas Used and Gas Price, appears to be within reasonable range. This suggests that the transaction was not rushed and that the sender was willing to wait for this transaction to be confirmed, which might hint towards the non-urgent nature of the transaction. As for potential risk factors or security concerns, the transaction itself appears to be standard and doesn't raise any immediate red flags. However, the parties involved should always be cautious about the address security, maintaining privacy, and avoiding social engineering attacks. For traders and investors, this transaction can be interpreted as a potential bullish sign if it signifies increased liquidity and investment in the Ethereum market, especially if it's followed by similar large transfers. However, due to the anonymous nature of the transaction, it's critical to combine this with other market indicators and not to rely solely on transaction analysis for investment decisions." +2024-11-27T13:52:23,b98bcbf6d57a158b67a126d8f023766e03fb15c3e74becc1189d4244fda61a13,0xEae7380dD4CeF6fbD1144F49E4D1e6964258A4F4,0x28C6c06298d514Db089934071355E5743bf21d60,401.99463589018103,21000,14.978063737,21281887,"Ethereum-Analysis-Agent: Transaction Analysis: This transaction marks a significant transfer of 401.99 ETH, approximately $845,000 at the current rate. The transaction did not involve any smart contract interaction, suggesting a simple fund transfer rather than a complicated operation or interaction with a DeFi protocol. From a broader market perspective, this transaction is meaningful but not as potentially impactful as larger transactions. It can nonetheless be part of a larger pattern of asset movement within the Ethereum ecosystem. If this transaction is part of larger investment activities, it could suggest an increase in demand for ETH and potentially impact its price. Without contract interaction, it's challenging to assess direct implications for DeFi protocols. However, the substantial ETH transfer could suggest a step towards participation in DeFi activities, or a movement of funds between different DeFi platforms. The transaction fee appears reasonable, given the Gas Used and Gas Price. This implies that the transaction wasn't urgent, and the sender was willing to wait for the transaction to be confirmed, indicating a non-critical movement of funds. In terms of security and risk factors, there are no immediate concerns from the transaction itself. Nevertheless, as with any crypto transaction, the parties involved should ensure secure storage of their keys, maintain privacy, and be wary of potential phishing or social engineering attacks. For traders and investors, this transaction could be seen as a bullish sign if it forms part of a trend of increased investment activities in the Ethereum market. However, it's important to remember that transaction analysis should be combined with other market indicators due to the anonymous nature of blockchain transactions." +2024-11-27T13:59:47,a985b74fd3dfee09cbe4a2e6890509e583a3f0ce13f68c98e82996e0f66428be,0xf7858Da8a6617f7C6d0fF2bcAFDb6D2eeDF64840,0xA294cCa691e4C83B1fc0c8d63D9a3eeF0A196DE1,136.0668,494665.408728,3635.46,21000,18.866443971,21281923,"1. MARKET CONTEXT The transaction of 136.07 ETH, equivalent to $494,665.41, is a significant movement in the Ethereum market. However, compared to the daily trading volume of Ethereum, which often exceeds billions of dollars, this transaction is not large enough to significantly impact the ETH price on its own. 2. BEHAVIORAL ANALYSIS The transaction does not appear to be a protocol movement as there is no contract interaction involved. It could be a whale movement, given the substantial amount of ETH transferred. However, without additional information about the wallets involved, it's difficult to definitively determine the nature of the transaction. The gas price of 18.87 Gwei is relatively standard, suggesting that the transaction was not urgent or time-sensitive. 3. RISK & IMPLICATIONS The transaction does not show signs of market manipulation or unusual activity. The absence of contract interaction suggests that this transaction does not directly involve DeFi protocols, reducing the risk of smart contract vulnerabilities or DeFi-related risks. However, the large amount of ETH transferred could potentially influence market sentiment if it is part of a larger trend of similar transactions. 4. STRATEGIC INSIGHTS Traders should note this transaction as part of the broader market activity. While a single transaction of this size is unlikely to significantly impact the market, a series of similar transactions could indicate a larger trend. If this is part of a larger movement of ETH out of exchanges, it could suggest a decrease in selling pressure, which could be bullish for ETH. Conversely, if this is part of a larger movement into exchanges, it could indicate an increase in selling pressure, which could be bearish for ETH. Traders should monitor the market for further similar transactions to gain a better understanding of the potential market trends." diff --git a/multi_tool_usage_agent.py b/multi_tool_usage_agent.py index 16f07cae..44577528 100644 --- a/multi_tool_usage_agent.py +++ b/multi_tool_usage_agent.py @@ -1,5 +1,6 @@ +import os from typing import List, Dict, Any, Optional, Callable -from dataclasses import dataclass +from dataclasses import dataclass, field import json from datetime import datetime import inspect @@ -7,7 +8,6 @@ import typing from typing import Union from swarms import Agent from swarm_models import OpenAIChat -from dotenv import load_dotenv @dataclass @@ -19,17 +19,6 @@ class ToolDefinition: callable: Optional[Callable] = None -@dataclass -class ExecutionStep: - step_id: str - tool_name: str - parameters: Dict[str, Any] - purpose: str - depends_on: List[str] - completed: bool = False - result: Optional[Any] = None - - def extract_type_hints(func: Callable) -> Dict[str, Any]: """Extract parameter types from function type hints.""" return typing.get_type_hints(func) @@ -86,267 +75,343 @@ def extract_tool_info(func: Callable) -> ToolDefinition: ) -class ToolUsingAgent: +@dataclass +class FunctionSpec: + """Specification for a callable tool function.""" + + name: str + description: str + parameters: Dict[ + str, dict + ] # Contains type and description for each parameter + return_type: str + return_description: str + + +@dataclass +class ExecutionStep: + """Represents a single step in the execution plan.""" + + step_id: int + function_name: str + parameters: Dict[str, Any] + expected_output: str + completed: bool = False + result: Any = None + + +@dataclass +class ExecutionContext: + """Maintains state during execution.""" + + task: str + steps: List[ExecutionStep] = field(default_factory=list) + results: Dict[int, Any] = field(default_factory=dict) + current_step: int = 0 + history: List[Dict[str, Any]] = field(default_factory=list) + + +class ToolAgent: def __init__( self, - tools: List[Callable], + functions: List[Callable], openai_api_key: str, model_name: str = "gpt-4", temperature: float = 0.1, - max_loops: int = 10, ): - # Convert callable tools to ToolDefinitions - self.available_tools = { - tool.__name__: extract_tool_info(tool) for tool in tools - } + self.functions = {func.__name__: func for func in functions} + self.function_specs = self._analyze_functions(functions) - self.execution_plan: List[ExecutionStep] = [] - self.current_step_index = 0 - self.max_loops = max_loops - - # Initialize the OpenAI model self.model = OpenAIChat( openai_api_key=openai_api_key, model_name=model_name, temperature=temperature, ) - # Create system prompt with tool descriptions self.system_prompt = self._create_system_prompt() - self.agent = Agent( - agent_name="Tool-Using-Agent", + agent_name="Tool-Agent", system_prompt=self.system_prompt, llm=self.model, max_loops=1, - autosave=True, verbose=True, - saved_state_path="tool_agent_state.json", - context_length=200000, ) + def _analyze_functions( + self, functions: List[Callable] + ) -> Dict[str, FunctionSpec]: + """Analyze functions to create detailed specifications.""" + specs = {} + for func in functions: + hints = get_type_hints(func) + sig = inspect.signature(func) + doc = inspect.getdoc(func) or "" + + # Parse docstring for parameter descriptions + param_descriptions = {} + current_param = None + for line in doc.split("\n"): + if ":param" in line: + param_name = ( + line.split(":param")[1].split(":")[0].strip() + ) + desc = line.split(":", 2)[-1].strip() + param_descriptions[param_name] = desc + elif ":return:" in line: + return_desc = line.split(":return:")[1].strip() + + # Build parameter specifications + parameters = {} + for name, param in sig.parameters.items(): + param_type = hints.get(name, Any) + parameters[name] = { + "type": str(param_type), + "type_class": param_type, + "description": param_descriptions.get(name, ""), + "required": param.default == param.empty, + } + + specs[func.__name__] = FunctionSpec( + name=func.__name__, + description=doc.split("\n")[0], + parameters=parameters, + return_type=str(hints.get("return", Any)), + return_description=( + return_desc if "return_desc" in locals() else "" + ), + ) + + return specs + def _create_system_prompt(self) -> str: - """Create system prompt with available tools information.""" - tools_description = [] - for tool_name, tool in self.available_tools.items(): - tools_description.append( + """Create system prompt with detailed function specifications.""" + functions_desc = [] + for spec in self.function_specs.values(): + params_desc = [] + for name, details in spec.parameters.items(): + params_desc.append( + f" - {name}: {details['type']} - {details['description']}" + ) + + functions_desc.append( f""" - Tool: {tool_name} - Description: {tool.description} - Parameters: {json.dumps(tool.parameters, indent=2)} - Required Parameters: {tool.required_params} - """ +Function: {spec.name} +Description: {spec.description} +Parameters: +{chr(10).join(params_desc)} +Returns: {spec.return_type} - {spec.return_description} + """ ) - output = f"""You are an autonomous agent capable of executing complex tasks using available tools. - - Available Tools: - {chr(10).join(tools_description)} - - Follow these protocols: - 1. Create a detailed plan using available tools - 2. Execute each step in order - 3. Handle errors appropriately - 4. Maintain execution state - 5. Return results in structured format - - You must ALWAYS respond in the following JSON format: - {{ - "plan": {{ - "description": "Brief description of the overall plan", - "steps": [ - {{ - "step_number": 1, - "tool_name": "name_of_tool", - "description": "What this step accomplishes", - "parameters": {{ - "param1": "value1", - "param2": "value2" - }}, - "expected_output": "Description of expected output" - }} - ] - }}, - "reasoning": "Explanation of why this plan was chosen" - }} - - Before executing any tool: - 1. Validate all required parameters are present - 2. Verify parameter types match specifications - 3. Check parameter values are within valid ranges/formats - 4. Ensure logical dependencies between steps are met - - If any validation fails: - 1. Return error in JSON format with specific details - 2. Suggest corrections if possible - 3. Do not proceed with execution - - After each step execution: - 1. Verify output matches expected format - 2. Log results and any warnings/errors - 3. Update execution state - 4. Determine if plan adjustment needed - - Error Handling: - 1. Catch and classify all errors - 2. Provide detailed error messages - 3. Suggest recovery steps - 4. Maintain system stability - - The final output must be valid JSON that can be parsed. Always check your response can be parsed as JSON before returning. - """ - return output - - def execute_tool( - self, tool_name: str, parameters: Dict[str, Any] + return f"""You are an AI agent that creates and executes plans using available functions. + +Available Functions: +{chr(10).join(functions_desc)} + +You must respond in two formats depending on the phase: + +1. Planning Phase: +{{ + "phase": "planning", + "plan": {{ + "description": "Overall plan description", + "steps": [ + {{ + "step_id": 1, + "function": "function_name", + "parameters": {{ + "param1": "value1", + "param2": "value2" + }}, + "purpose": "Why this step is needed" + }} + ] + }} +}} + +2. Execution Phase: +{{ + "phase": "execution", + "analysis": "Analysis of current result", + "next_action": {{ + "type": "continue|request_input|complete", + "reason": "Why this action was chosen", + "needed_input": {{}} # If requesting input + }} +}} + +Always: +- Use exact function names +- Ensure parameter types match specifications +- Provide clear reasoning for each decision +""" + + def _execute_function( + self, spec: FunctionSpec, parameters: Dict[str, Any] ) -> Any: - """Execute a tool with given parameters.""" - tool = self.available_tools[tool_name] - if not tool.callable: - raise ValueError( - f"Tool {tool_name} has no associated callable" - ) - - # Convert parameters to appropriate types + """Execute a function with type checking.""" converted_params = {} - for param_name, param_value in parameters.items(): - param_info = tool.parameters[param_name] - param_type = eval( - param_info["type"] - ) # Note: Be careful with eval - converted_params[param_name] = param_type(param_value) - - return tool.callable(**converted_params) + for name, value in parameters.items(): + param_spec = spec.parameters[name] + try: + # Convert value to required type + param_type = param_spec["type_class"] + if param_type in (int, float, str, bool): + converted_params[name] = param_type(value) + else: + converted_params[name] = value + except (ValueError, TypeError) as e: + raise ValueError( + f"Parameter '{name}' conversion failed: {str(e)}" + ) + + return self.functions[spec.name](**converted_params) def run(self, task: str) -> Dict[str, Any]: - """Execute the complete task with proper logging and error handling.""" + """Execute task with planning and step-by-step execution.""" + context = ExecutionContext(task=task) execution_log = { "task": task, "start_time": datetime.utcnow().isoformat(), "steps": [], - "final_result": None + "final_result": None, } - + try: - # Create and execute plan - plan_response = self.agent.run(f"Create a plan for: {task}") - plan_data = json.loads(plan_response) - - # Extract steps from the correct path in JSON - steps = plan_data["plan"]["steps"] # Changed from plan_data["steps"] - - for step in steps: + # Planning phase + plan_prompt = f"Create a plan to: {task}" + plan_response = self.agent.run(plan_prompt) + plan_data = json.loads( + plan_response.replace("System:", "").strip() + ) + + # Convert plan to execution steps + for step in plan_data["plan"]["steps"]: + context.steps.append( + ExecutionStep( + step_id=step["step_id"], + function_name=step["function"], + parameters=step["parameters"], + expected_output=step["purpose"], + ) + ) + + # Execution phase + while context.current_step < len(context.steps): + step = context.steps[context.current_step] + print( + f"\nExecuting step {step.step_id}: {step.function_name}" + ) + try: - # Check if parameters need default values - for param_name, param_value in step["parameters"].items(): - if isinstance(param_value, str) and not param_value.replace(".", "").isdigit(): - # If parameter is a description rather than a value, set default - if "income" in param_name.lower(): - step["parameters"][param_name] = 75000.0 - elif "year" in param_name.lower(): - step["parameters"][param_name] = 2024 - elif "investment" in param_name.lower(): - step["parameters"][param_name] = 1000.0 - - # Execute the tool - result = self.execute_tool( - step["tool_name"], - step["parameters"] + # Execute function + spec = self.function_specs[step.function_name] + result = self._execute_function( + spec, step.parameters ) + context.results[step.step_id] = result + step.completed = True + step.result = result + + # Get agent's analysis + analysis_prompt = f""" + Step {step.step_id} completed: + Function: {step.function_name} + Result: {json.dumps(result)} + Remaining steps: {len(context.steps) - context.current_step - 1} - execution_log["steps"].append({ - "step_number": step["step_number"], - "tool": step["tool_name"], - "parameters": step["parameters"], - "success": True, - "result": result, - "description": step["description"] - }) - + Analyze the result and decide next action. + """ + + analysis_response = self.agent.run( + analysis_prompt + ) + analysis_data = json.loads( + analysis_response.replace( + "System:", "" + ).strip() + ) + + execution_log["steps"].append( + { + "step_id": step.step_id, + "function": step.function_name, + "parameters": step.parameters, + "result": result, + "analysis": analysis_data, + } + ) + + if ( + analysis_data["next_action"]["type"] + == "complete" + ): + if ( + context.current_step + < len(context.steps) - 1 + ): + continue + break + + context.current_step += 1 + except Exception as e: - execution_log["steps"].append({ - "step_number": step["step_number"], - "tool": step["tool_name"], - "parameters": step["parameters"], - "success": False, - "error": str(e), - "description": step["description"] - }) - print(f"Error executing step {step['step_number']}: {str(e)}") - # Continue with next step instead of raising - continue + print(f"Error in step {step.step_id}: {str(e)}") + execution_log["steps"].append( + { + "step_id": step.step_id, + "function": step.function_name, + "parameters": step.parameters, + "error": str(e), + } + ) + raise + + # Final analysis + final_prompt = f""" + Task completed. Results: + {json.dumps(context.results, indent=2)} - # Only mark as success if at least some steps succeeded - successful_steps = [s for s in execution_log["steps"] if s["success"]] - if successful_steps: - execution_log["final_result"] = { - "success": True, - "results": successful_steps, - "reasoning": plan_data.get("reasoning", "No reasoning provided") - } - else: - execution_log["final_result"] = { - "success": False, - "error": "No steps completed successfully", - "plan": plan_data - } - + Provide final analysis and recommendations. + """ + + final_analysis = self.agent.run(final_prompt) + execution_log["final_result"] = { + "success": True, + "results": context.results, + "analysis": json.loads( + final_analysis.replace("System:", "").strip() + ), + } + except Exception as e: execution_log["final_result"] = { "success": False, "error": str(e), - "plan": plan_data if 'plan_data' in locals() else None } - + execution_log["end_time"] = datetime.utcnow().isoformat() return execution_log -# Example usage -if __name__ == "__main__": - load_dotenv() - - # Example tool functions - def research_ira_requirements() -> Dict[str, Any]: - """Research and return ROTH IRA eligibility requirements.""" - return { - "age_requirement": "Must have earned income", - "income_limits": {"single": 144000, "married": 214000}, - } +def calculate_investment_return( + principal: float, rate: float, years: int +) -> float: + """Calculate investment return with compound interest. - def calculate_contribution_limit( - income: float, tax_year: int - ) -> Dict[str, float]: - """Calculate maximum ROTH IRA contribution based on income and tax year.""" - base_limit = 6000 if tax_year <= 2022 else 6500 - if income > 144000: - return {"limit": 0} - return {"limit": base_limit} - - def find_brokers(min_investment: float) -> List[Dict[str, Any]]: - """Find suitable brokers for ROTH IRA based on minimum investment.""" - return [ - {"name": "Broker A", "min_investment": min_investment}, - { - "name": "Broker B", - "min_investment": min_investment * 1.5, - }, - ] + :param principal: Initial investment amount in dollars + :param rate: Annual interest rate as decimal (e.g., 0.07 for 7%) + :param years: Number of years to invest + :return: Final investment value + """ + return principal * (1 + rate) ** years - # Initialize agent with tools - agent = ToolUsingAgent( - tools=[ - research_ira_requirements, - calculate_contribution_limit, - find_brokers, - ], - openai_api_key="", - ) - # Run a task - result = agent.run( - "How can I establish a ROTH IRA to buy stocks and get a tax break? " - "What are the criteria?" - ) +agent = ToolAgent( + functions=[calculate_investment_return], + openai_api_key=os.getenv("OPENAI_API_KEY"), +) - print(json.dumps(result, indent=2)) +result = agent.run( + "Calculate returns for $10000 invested at 7% for 10 years" +) diff --git a/pyproject.toml b/pyproject.toml index bb3d5043..51bb898f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "6.3.6" +version = "6.3.7" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/__init__.py b/swarms/__init__.py index d73b8439..0c3b5ca5 100644 --- a/swarms/__init__.py +++ b/swarms/__init__.py @@ -5,8 +5,8 @@ from loguru import logger load_dotenv() -# More reliable string comparison -if os.getenv("SWARMS_VERBOSE_GLOBAL", "True").lower() == "false": +# Disable logging by default +if os.getenv("SWARMS_VERBOSE_GLOBAL", "False").lower() == "false": logger.disable("") # Import telemetry functions with error handling diff --git a/swarms/telemetry/auto_upgrade_swarms.py b/swarms/telemetry/auto_upgrade_swarms.py index 7203cd85..440f70ed 100644 --- a/swarms/telemetry/auto_upgrade_swarms.py +++ b/swarms/telemetry/auto_upgrade_swarms.py @@ -12,7 +12,7 @@ def auto_update(): try: # Check if auto-update is disabled auto_update_enabled = os.getenv( - "SWARMS_AUTOUPDATE_ON", "true" + "SWARMS_AUTOUPDATE_ON", "false" ).lower() if auto_update_enabled == "false": logger.info( diff --git a/swarms/tools/tool_utils.py b/swarms/tools/tool_utils.py index 972f98ec..b448d7a9 100644 --- a/swarms/tools/tool_utils.py +++ b/swarms/tools/tool_utils.py @@ -36,7 +36,7 @@ def scrape_tool_func_docs(fn: Callable) -> str: f" {inspect.getdoc(fn)}\nParameters:\n{parameters_str}" ) except Exception as error: - error_msg = ( + ( formatter.print_panel( f"Error scraping tool function docs {error} try" " optimizing your inputs with different" diff --git a/test.py b/test.py new file mode 100644 index 00000000..ce12ec1c --- /dev/null +++ b/test.py @@ -0,0 +1,292 @@ +import torch +import torch.nn as nn +import torch.distributed as dist +from dataclasses import dataclass +from typing import Optional, Tuple, Union +from loguru import logger +import math + + +@dataclass +class StarAttentionConfig: + """Configuration for StarAttention module. + + Attributes: + hidden_size: Dimension of the model's hidden states + num_attention_heads: Number of attention heads + num_hosts: Number of hosts in the distributed system + block_size: Size of each context block + anchor_size: Size of the anchor block + dropout_prob: Dropout probability (default: 0.1) + layer_norm_eps: Layer normalization epsilon (default: 1e-12) + """ + + hidden_size: int + num_attention_heads: int + num_hosts: int + block_size: int + anchor_size: int + dropout_prob: float = 0.1 + layer_norm_eps: float = 1e-12 + + +class StarAttention(nn.Module): + """ + Implementation of Star Attention mechanism for distributed inference. + + The module implements a two-phase attention mechanism: + 1. Local Context Encoding with Anchor Blocks + 2. Query Encoding and Output Generation with Global Attention + """ + + def __init__(self, config: StarAttentionConfig): + super().__init__() + + if config.hidden_size % config.num_attention_heads != 0: + raise ValueError( + f"Hidden size {config.hidden_size} not divisible by number of attention " + f"heads {config.num_attention_heads}" + ) + + self.config = config + self.head_dim = ( + config.hidden_size // config.num_attention_heads + ) + + # Initialize components + self.query = nn.Linear(config.hidden_size, config.hidden_size) + self.key = nn.Linear(config.hidden_size, config.hidden_size) + self.value = nn.Linear(config.hidden_size, config.hidden_size) + + self.dropout = nn.Dropout(config.dropout_prob) + self.layer_norm = nn.LayerNorm( + config.hidden_size, eps=config.layer_norm_eps + ) + + # KV cache for storing computed key/value pairs + self.kv_cache = {} + + logger.info( + f"Initialized StarAttention with config: {config}" + ) + + def _split_heads( + self, tensor: torch.Tensor, num_heads: int + ) -> torch.Tensor: + """Split the last dimension into (num_heads, head_dim).""" + batch_size, seq_len, _ = tensor.size() + tensor = tensor.view( + batch_size, seq_len, num_heads, self.head_dim + ) + # Transpose to (batch_size, num_heads, seq_len, head_dim) + return tensor.transpose(1, 2) + + def _merge_heads(self, tensor: torch.Tensor) -> torch.Tensor: + """Merge the head dimension back into hidden_size.""" + batch_size, _, seq_len, _ = tensor.size() + tensor = tensor.transpose(1, 2) + return tensor.reshape( + batch_size, seq_len, self.config.hidden_size + ) + + def _compute_attention_scores( + self, + query: torch.Tensor, + key: torch.Tensor, + value: torch.Tensor, + mask: Optional[torch.Tensor] = None, + ) -> Tuple[torch.Tensor, torch.Tensor]: + """Compute attention scores and weighted values.""" + # Scale dot-product attention + scores = torch.matmul( + query, key.transpose(-2, -1) + ) / math.sqrt(self.head_dim) + + if mask is not None: + scores = scores.masked_fill(mask == 0, float("-inf")) + + # Online softmax computation + attention_probs = torch.nn.functional.softmax(scores, dim=-1) + attention_probs = self.dropout(attention_probs) + + context = torch.matmul(attention_probs, value) + + return context, attention_probs + + def phase1_local_context_encoding( + self, + input_ids: torch.Tensor, + host_id: int, + device: Union[str, torch.device] = "cuda", + ) -> None: + """ + Phase 1: Local Context Encoding with Anchor Blocks + + Args: + input_ids: Input tensor of shape (batch_size, seq_len) + host_id: ID of the current host + device: Device to run computations on + """ + logger.debug(f"Starting Phase 1 on host {host_id}") + + # Calculate block assignments + block_start = host_id * self.config.block_size + block_end = block_start + self.config.block_size + + # Get local block + local_block = input_ids[:, block_start:block_end].to(device) + + # Get anchor block (first block) + anchor_block = input_ids[:, : self.config.anchor_size].to( + device + ) + + # Compute KV pairs for local block + local_hidden = self.layer_norm(local_block) + local_key = self._split_heads( + self.key(local_hidden), self.config.num_attention_heads + ) + local_value = self._split_heads( + self.value(local_hidden), self.config.num_attention_heads + ) + + # Store in KV cache + self.kv_cache[host_id] = { + "key": local_key, + "value": local_value, + "anchor_key": ( + None + if host_id == 0 + else self._split_heads( + self.key(self.layer_norm(anchor_block)), + self.config.num_attention_heads, + ) + ), + } + + logger.debug( + f"Phase 1 complete on host {host_id}. KV cache shapes - " + f"key: {local_key.shape}, value: {local_value.shape}" + ) + + def phase2_query_encoding( + self, + query_input: torch.Tensor, + host_id: int, + is_query_host: bool, + device: Union[str, torch.device] = "cuda", + ) -> Optional[torch.Tensor]: + """ + Phase 2: Query Encoding and Output Generation + + Args: + query_input: Query tensor of shape (batch_size, seq_len, hidden_size) + host_id: ID of the current host + is_query_host: Whether this host is the query host + device: Device to run computations on + + Returns: + Output tensor if this is the query host, None otherwise + """ + logger.debug(f"Starting Phase 2 on host {host_id}") + + # Transform query + query_hidden = self.layer_norm(query_input) + query = self._split_heads( + self.query(query_hidden), self.config.num_attention_heads + ) + + # Compute local attention scores + local_context, local_probs = self._compute_attention_scores( + query, + self.kv_cache[host_id]["key"], + self.kv_cache[host_id]["value"], + ) + + if not is_query_host: + # Non-query hosts send their local attention statistics + dist.send(local_probs, dst=self.config.num_hosts - 1) + return None + + # Query host aggregates attention from all hosts + all_attention_probs = [local_probs] + for src_rank in range(self.config.num_hosts - 1): + probs = torch.empty_like(local_probs) + dist.recv(probs, src=src_rank) + all_attention_probs.append(probs) + + # Compute global attention + torch.mean(torch.stack(all_attention_probs), dim=0) + + # Final output computation + output = self._merge_heads(local_context) + output = self.dropout(output) + + logger.debug( + f"Phase 2 complete on host {host_id}. Output shape: {output.shape}" + ) + + return output + + def forward( + self, + input_ids: torch.Tensor, + query_input: torch.Tensor, + host_id: int, + is_query_host: bool, + device: Union[str, torch.device] = "cuda", + ) -> Optional[torch.Tensor]: + """ + Forward pass of the StarAttention module. + + Args: + input_ids: Input tensor of shape (batch_size, seq_len) + query_input: Query tensor of shape (batch_size, seq_len, hidden_size) + host_id: ID of the current host + is_query_host: Whether this host is the query host + device: Device to run computations on + + Returns: + Output tensor if this is the query host, None otherwise + """ + # Phase 1: Local Context Encoding + self.phase1_local_context_encoding(input_ids, host_id, device) + + # Phase 2: Query Encoding and Output Generation + return self.phase2_query_encoding( + query_input, host_id, is_query_host, device + ) + + +# Example forward pass +config = StarAttentionConfig( + hidden_size=768, + num_attention_heads=12, + num_hosts=3, + block_size=512, + anchor_size=128, +) + +# Initialize model +model = StarAttention(config) + +# Example input tensors +batch_size = 4 +seq_len = 512 +input_ids = torch.randint( + 0, 1000, (batch_size, seq_len) +) # Random input IDs +query_input = torch.randn( + batch_size, seq_len, config.hidden_size +) # Random query input + +# Example forward pass for query host (host_id = 2) +output = model( + input_ids=input_ids, + query_input=query_input, + host_id=2, + is_query_host=True, + device="cpu", +) + +print(output)