You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/ethchain_agent.py

309 lines
10 KiB

import os
from swarms import Agent
from swarm_models import OpenAIChat
from web3 import Web3
from typing import Dict, Optional, Any
from datetime import datetime
import asyncio
from loguru import logger
from dotenv import load_dotenv
import csv
import requests
import time
BLOCKCHAIN_AGENT_PROMPT = """
You are an expert blockchain and cryptocurrency analyst with deep knowledge of Ethereum markets and DeFi ecosystems.
You have access to real-time ETH price data and transaction information.
For each transaction, analyze:
1. MARKET CONTEXT
- Current ETH price and what this transaction means in USD terms
- How this movement compares to typical market volumes
- Whether this could impact ETH price
2. BEHAVIORAL ANALYSIS
- Whether this appears to be institutional, whale, or protocol movement
- If this fits any known wallet patterns or behaviors
- Signs of smart contract interaction or DeFi activity
3. RISK & IMPLICATIONS
- Potential market impact or price influence
- Signs of potential market manipulation or unusual activity
- Protocol or DeFi risks if applicable
4. STRATEGIC INSIGHTS
- What traders should know about this movement
- Potential chain reactions or follow-up effects
- Market opportunities or risks created
Write naturally but precisely. Focus on actionable insights and important patterns.
Your analysis helps traders and researchers understand significant market movements in real-time."""
class EthereumAnalyzer:
def __init__(self, min_value_eth: float = 100.0):
load_dotenv()
logger.add(
"eth_analysis.log",
rotation="500 MB",
retention="10 days",
level="INFO",
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
)
self.w3 = Web3(
Web3.HTTPProvider(
"https://mainnet.infura.io/v3/9aa3d95b3bc440fa88ea12eaa4456161"
)
)
if not self.w3.is_connected():
raise ConnectionError(
"Failed to connect to Ethereum network"
)
self.min_value_eth = min_value_eth
self.last_processed_block = self.w3.eth.block_number
self.eth_price = self.get_eth_price()
self.last_price_update = time.time()
# Initialize AI agent
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError(
"OpenAI API key not found in environment variables"
)
model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4",
temperature=0.1,
)
self.agent = Agent(
agent_name="Ethereum-Analysis-Agent",
system_prompt=BLOCKCHAIN_AGENT_PROMPT,
llm=model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="eth_agent.json",
user_name="eth_analyzer",
retry_attempts=1,
context_length=200000,
output_type="string",
streaming_on=False,
)
self.csv_filename = "ethereum_analysis.csv"
self.initialize_csv()
def get_eth_price(self) -> float:
"""Get current ETH price from CoinGecko API."""
try:
response = requests.get(
"https://api.coingecko.com/api/v3/simple/price",
params={"ids": "ethereum", "vs_currencies": "usd"},
)
return float(response.json()["ethereum"]["usd"])
except Exception as e:
logger.error(f"Error fetching ETH price: {str(e)}")
return 0.0
def update_eth_price(self):
"""Update ETH price if more than 5 minutes have passed."""
if time.time() - self.last_price_update > 300: # 5 minutes
self.eth_price = self.get_eth_price()
self.last_price_update = time.time()
logger.info(f"Updated ETH price: ${self.eth_price:,.2f}")
def initialize_csv(self):
"""Initialize CSV file with headers."""
headers = [
"timestamp",
"transaction_hash",
"from_address",
"to_address",
"value_eth",
"value_usd",
"eth_price",
"gas_used",
"gas_price_gwei",
"block_number",
"analysis",
]
if not os.path.exists(self.csv_filename):
with open(self.csv_filename, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(headers)
async def analyze_transaction(
self, tx_hash: str
) -> Optional[Dict[str, Any]]:
"""Analyze a single transaction."""
try:
tx = self.w3.eth.get_transaction(tx_hash)
receipt = self.w3.eth.get_transaction_receipt(tx_hash)
value_eth = float(self.w3.from_wei(tx.value, "ether"))
if value_eth < self.min_value_eth:
return None
block = self.w3.eth.get_block(tx.blockNumber)
# Update ETH price if needed
self.update_eth_price()
value_usd = value_eth * self.eth_price
analysis = {
"timestamp": datetime.fromtimestamp(
block.timestamp
).isoformat(),
"transaction_hash": tx_hash.hex(),
"from_address": tx["from"],
"to_address": tx.to if tx.to else "Contract Creation",
"value_eth": value_eth,
"value_usd": value_usd,
"eth_price": self.eth_price,
"gas_used": receipt.gasUsed,
"gas_price_gwei": float(
self.w3.from_wei(tx.gasPrice, "gwei")
),
"block_number": tx.blockNumber,
}
# Check if it's a contract
if tx.to:
code = self.w3.eth.get_code(tx.to)
analysis["is_contract"] = len(code) > 0
# Get contract events
if analysis["is_contract"]:
analysis["events"] = receipt.logs
return analysis
except Exception as e:
logger.error(
f"Error analyzing transaction {tx_hash}: {str(e)}"
)
return None
def prepare_analysis_prompt(self, tx_data: Dict[str, Any]) -> str:
"""Prepare detailed analysis prompt including price context."""
value_usd = tx_data["value_usd"]
eth_price = tx_data["eth_price"]
prompt = f"""Analyze this Ethereum transaction in current market context:
Transaction Details:
- Value: {tx_data['value_eth']:.2f} ETH (${value_usd:,.2f} at current price)
- Current ETH Price: ${eth_price:,.2f}
- From: {tx_data['from_address']}
- To: {tx_data['to_address']}
- Contract Interaction: {tx_data.get('is_contract', False)}
- Gas Used: {tx_data['gas_used']:,} units
- Gas Price: {tx_data['gas_price_gwei']:.2f} Gwei
- Block: {tx_data['block_number']}
- Timestamp: {tx_data['timestamp']}
{f"Event Count: {len(tx_data['events'])} events" if tx_data.get('events') else "No contract events"}
Consider the transaction's significance given the current ETH price of ${eth_price:,.2f} and total USD value of ${value_usd:,.2f}.
Analyze market impact, patterns, risks, and strategic implications."""
return prompt
def save_to_csv(self, tx_data: Dict[str, Any], ai_analysis: str):
"""Save transaction data and analysis to CSV."""
row = [
tx_data["timestamp"],
tx_data["transaction_hash"],
tx_data["from_address"],
tx_data["to_address"],
tx_data["value_eth"],
tx_data["value_usd"],
tx_data["eth_price"],
tx_data["gas_used"],
tx_data["gas_price_gwei"],
tx_data["block_number"],
ai_analysis.replace("\n", " "),
]
with open(self.csv_filename, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow(row)
async def monitor_transactions(self):
"""Monitor and analyze transactions one at a time."""
logger.info(
f"Starting transaction monitor (minimum value: {self.min_value_eth} ETH)"
)
while True:
try:
current_block = self.w3.eth.block_number
block = self.w3.eth.get_block(
current_block, full_transactions=True
)
for tx in block.transactions:
tx_analysis = await self.analyze_transaction(
tx.hash
)
if tx_analysis:
# Get AI analysis
analysis_prompt = (
self.prepare_analysis_prompt(tx_analysis)
)
ai_analysis = self.agent.run(analysis_prompt)
print(ai_analysis)
# Save to CSV
self.save_to_csv(tx_analysis, ai_analysis)
# Print analysis
print("\n" + "=" * 50)
print("New Transaction Analysis")
print(
f"Hash: {tx_analysis['transaction_hash']}"
)
print(
f"Value: {tx_analysis['value_eth']:.2f} ETH (${tx_analysis['value_usd']:,.2f})"
)
print(
f"Current ETH Price: ${self.eth_price:,.2f}"
)
print("=" * 50)
print(ai_analysis)
print("=" * 50 + "\n")
await asyncio.sleep(1) # Wait for next block
except Exception as e:
logger.error(f"Error in monitoring loop: {str(e)}")
await asyncio.sleep(1)
async def main():
"""Entry point for the analysis system."""
analyzer = EthereumAnalyzer(min_value_eth=100.0)
await analyzer.monitor_transactions()
if __name__ == "__main__":
print("Starting Ethereum Transaction Analyzer...")
print("Saving results to ethereum_analysis.csv")
print("Press Ctrl+C to stop")
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopping analyzer...")