You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
309 lines
10 KiB
309 lines
10 KiB
import os
|
|
from swarms import Agent
|
|
from swarm_models import OpenAIChat
|
|
from web3 import Web3
|
|
from typing import Dict, Optional, Any
|
|
from datetime import datetime
|
|
import asyncio
|
|
from loguru import logger
|
|
from dotenv import load_dotenv
|
|
import csv
|
|
import requests
|
|
import time
|
|
|
|
BLOCKCHAIN_AGENT_PROMPT = """
|
|
You are an expert blockchain and cryptocurrency analyst with deep knowledge of Ethereum markets and DeFi ecosystems.
|
|
You have access to real-time ETH price data and transaction information.
|
|
|
|
For each transaction, analyze:
|
|
|
|
1. MARKET CONTEXT
|
|
- Current ETH price and what this transaction means in USD terms
|
|
- How this movement compares to typical market volumes
|
|
- Whether this could impact ETH price
|
|
|
|
2. BEHAVIORAL ANALYSIS
|
|
- Whether this appears to be institutional, whale, or protocol movement
|
|
- If this fits any known wallet patterns or behaviors
|
|
- Signs of smart contract interaction or DeFi activity
|
|
|
|
3. RISK & IMPLICATIONS
|
|
- Potential market impact or price influence
|
|
- Signs of potential market manipulation or unusual activity
|
|
- Protocol or DeFi risks if applicable
|
|
|
|
4. STRATEGIC INSIGHTS
|
|
- What traders should know about this movement
|
|
- Potential chain reactions or follow-up effects
|
|
- Market opportunities or risks created
|
|
|
|
Write naturally but precisely. Focus on actionable insights and important patterns.
|
|
Your analysis helps traders and researchers understand significant market movements in real-time."""
|
|
|
|
|
|
class EthereumAnalyzer:
|
|
def __init__(self, min_value_eth: float = 100.0):
|
|
load_dotenv()
|
|
|
|
logger.add(
|
|
"eth_analysis.log",
|
|
rotation="500 MB",
|
|
retention="10 days",
|
|
level="INFO",
|
|
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}",
|
|
)
|
|
|
|
self.w3 = Web3(
|
|
Web3.HTTPProvider(
|
|
"https://mainnet.infura.io/v3/9aa3d95b3bc440fa88ea12eaa4456161"
|
|
)
|
|
)
|
|
if not self.w3.is_connected():
|
|
raise ConnectionError(
|
|
"Failed to connect to Ethereum network"
|
|
)
|
|
|
|
self.min_value_eth = min_value_eth
|
|
self.last_processed_block = self.w3.eth.block_number
|
|
self.eth_price = self.get_eth_price()
|
|
self.last_price_update = time.time()
|
|
|
|
# Initialize AI agent
|
|
api_key = os.getenv("OPENAI_API_KEY")
|
|
if not api_key:
|
|
raise ValueError(
|
|
"OpenAI API key not found in environment variables"
|
|
)
|
|
|
|
model = OpenAIChat(
|
|
openai_api_key=api_key,
|
|
model_name="gpt-4",
|
|
temperature=0.1,
|
|
)
|
|
|
|
self.agent = Agent(
|
|
agent_name="Ethereum-Analysis-Agent",
|
|
system_prompt=BLOCKCHAIN_AGENT_PROMPT,
|
|
llm=model,
|
|
max_loops=1,
|
|
autosave=True,
|
|
dashboard=False,
|
|
verbose=True,
|
|
dynamic_temperature_enabled=True,
|
|
saved_state_path="eth_agent.json",
|
|
user_name="eth_analyzer",
|
|
retry_attempts=1,
|
|
context_length=200000,
|
|
output_type="string",
|
|
streaming_on=False,
|
|
)
|
|
|
|
self.csv_filename = "ethereum_analysis.csv"
|
|
self.initialize_csv()
|
|
|
|
def get_eth_price(self) -> float:
|
|
"""Get current ETH price from CoinGecko API."""
|
|
try:
|
|
response = requests.get(
|
|
"https://api.coingecko.com/api/v3/simple/price",
|
|
params={"ids": "ethereum", "vs_currencies": "usd"},
|
|
)
|
|
return float(response.json()["ethereum"]["usd"])
|
|
except Exception as e:
|
|
logger.error(f"Error fetching ETH price: {str(e)}")
|
|
return 0.0
|
|
|
|
def update_eth_price(self):
|
|
"""Update ETH price if more than 5 minutes have passed."""
|
|
if time.time() - self.last_price_update > 300: # 5 minutes
|
|
self.eth_price = self.get_eth_price()
|
|
self.last_price_update = time.time()
|
|
logger.info(f"Updated ETH price: ${self.eth_price:,.2f}")
|
|
|
|
def initialize_csv(self):
|
|
"""Initialize CSV file with headers."""
|
|
headers = [
|
|
"timestamp",
|
|
"transaction_hash",
|
|
"from_address",
|
|
"to_address",
|
|
"value_eth",
|
|
"value_usd",
|
|
"eth_price",
|
|
"gas_used",
|
|
"gas_price_gwei",
|
|
"block_number",
|
|
"analysis",
|
|
]
|
|
|
|
if not os.path.exists(self.csv_filename):
|
|
with open(self.csv_filename, "w", newline="") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(headers)
|
|
|
|
async def analyze_transaction(
|
|
self, tx_hash: str
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Analyze a single transaction."""
|
|
try:
|
|
tx = self.w3.eth.get_transaction(tx_hash)
|
|
receipt = self.w3.eth.get_transaction_receipt(tx_hash)
|
|
|
|
value_eth = float(self.w3.from_wei(tx.value, "ether"))
|
|
|
|
if value_eth < self.min_value_eth:
|
|
return None
|
|
|
|
block = self.w3.eth.get_block(tx.blockNumber)
|
|
|
|
# Update ETH price if needed
|
|
self.update_eth_price()
|
|
|
|
value_usd = value_eth * self.eth_price
|
|
|
|
analysis = {
|
|
"timestamp": datetime.fromtimestamp(
|
|
block.timestamp
|
|
).isoformat(),
|
|
"transaction_hash": tx_hash.hex(),
|
|
"from_address": tx["from"],
|
|
"to_address": tx.to if tx.to else "Contract Creation",
|
|
"value_eth": value_eth,
|
|
"value_usd": value_usd,
|
|
"eth_price": self.eth_price,
|
|
"gas_used": receipt.gasUsed,
|
|
"gas_price_gwei": float(
|
|
self.w3.from_wei(tx.gasPrice, "gwei")
|
|
),
|
|
"block_number": tx.blockNumber,
|
|
}
|
|
|
|
# Check if it's a contract
|
|
if tx.to:
|
|
code = self.w3.eth.get_code(tx.to)
|
|
analysis["is_contract"] = len(code) > 0
|
|
|
|
# Get contract events
|
|
if analysis["is_contract"]:
|
|
analysis["events"] = receipt.logs
|
|
|
|
return analysis
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error analyzing transaction {tx_hash}: {str(e)}"
|
|
)
|
|
return None
|
|
|
|
def prepare_analysis_prompt(self, tx_data: Dict[str, Any]) -> str:
|
|
"""Prepare detailed analysis prompt including price context."""
|
|
value_usd = tx_data["value_usd"]
|
|
eth_price = tx_data["eth_price"]
|
|
|
|
prompt = f"""Analyze this Ethereum transaction in current market context:
|
|
|
|
Transaction Details:
|
|
- Value: {tx_data['value_eth']:.2f} ETH (${value_usd:,.2f} at current price)
|
|
- Current ETH Price: ${eth_price:,.2f}
|
|
- From: {tx_data['from_address']}
|
|
- To: {tx_data['to_address']}
|
|
- Contract Interaction: {tx_data.get('is_contract', False)}
|
|
- Gas Used: {tx_data['gas_used']:,} units
|
|
- Gas Price: {tx_data['gas_price_gwei']:.2f} Gwei
|
|
- Block: {tx_data['block_number']}
|
|
- Timestamp: {tx_data['timestamp']}
|
|
|
|
{f"Event Count: {len(tx_data['events'])} events" if tx_data.get('events') else "No contract events"}
|
|
|
|
Consider the transaction's significance given the current ETH price of ${eth_price:,.2f} and total USD value of ${value_usd:,.2f}.
|
|
Analyze market impact, patterns, risks, and strategic implications."""
|
|
|
|
return prompt
|
|
|
|
def save_to_csv(self, tx_data: Dict[str, Any], ai_analysis: str):
|
|
"""Save transaction data and analysis to CSV."""
|
|
row = [
|
|
tx_data["timestamp"],
|
|
tx_data["transaction_hash"],
|
|
tx_data["from_address"],
|
|
tx_data["to_address"],
|
|
tx_data["value_eth"],
|
|
tx_data["value_usd"],
|
|
tx_data["eth_price"],
|
|
tx_data["gas_used"],
|
|
tx_data["gas_price_gwei"],
|
|
tx_data["block_number"],
|
|
ai_analysis.replace("\n", " "),
|
|
]
|
|
|
|
with open(self.csv_filename, "a", newline="") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(row)
|
|
|
|
async def monitor_transactions(self):
|
|
"""Monitor and analyze transactions one at a time."""
|
|
logger.info(
|
|
f"Starting transaction monitor (minimum value: {self.min_value_eth} ETH)"
|
|
)
|
|
|
|
while True:
|
|
try:
|
|
current_block = self.w3.eth.block_number
|
|
block = self.w3.eth.get_block(
|
|
current_block, full_transactions=True
|
|
)
|
|
|
|
for tx in block.transactions:
|
|
tx_analysis = await self.analyze_transaction(
|
|
tx.hash
|
|
)
|
|
|
|
if tx_analysis:
|
|
# Get AI analysis
|
|
analysis_prompt = (
|
|
self.prepare_analysis_prompt(tx_analysis)
|
|
)
|
|
ai_analysis = self.agent.run(analysis_prompt)
|
|
print(ai_analysis)
|
|
|
|
# Save to CSV
|
|
self.save_to_csv(tx_analysis, ai_analysis)
|
|
|
|
# Print analysis
|
|
print("\n" + "=" * 50)
|
|
print("New Transaction Analysis")
|
|
print(
|
|
f"Hash: {tx_analysis['transaction_hash']}"
|
|
)
|
|
print(
|
|
f"Value: {tx_analysis['value_eth']:.2f} ETH (${tx_analysis['value_usd']:,.2f})"
|
|
)
|
|
print(
|
|
f"Current ETH Price: ${self.eth_price:,.2f}"
|
|
)
|
|
print("=" * 50)
|
|
print(ai_analysis)
|
|
print("=" * 50 + "\n")
|
|
|
|
await asyncio.sleep(1) # Wait for next block
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in monitoring loop: {str(e)}")
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
async def main():
|
|
"""Entry point for the analysis system."""
|
|
analyzer = EthereumAnalyzer(min_value_eth=100.0)
|
|
await analyzer.monitor_transactions()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("Starting Ethereum Transaction Analyzer...")
|
|
print("Saving results to ethereum_analysis.csv")
|
|
print("Press Ctrl+C to stop")
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
print("\nStopping analyzer...")
|