pull/703/head
Kye Gomez 4 months ago committed by mike dupont
parent 8623a09e41
commit a673ba2d71

@ -7,19 +7,21 @@ from swarms.prompts.finance_agent_sys_prompt import (
agent = Agent( agent = Agent(
agent_name="Financial-Analysis-Agent", agent_name="Financial-Analysis-Agent",
agent_description = "Personal finance advisor agent", agent_description = "Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT, system_prompt=FINANCIAL_AGENT_SYS_PROMPT + "Output the <DONE> token when you're done creating a portfolio of etfs, index, funds, and more for AI",
max_loops=1, model_name="gpt-4o", # Use any model from litellm
model_name="gpt-4o", max_loops="auto",
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
user_name="swarms_corp", user_name="Kye",
retry_attempts=3, retry_attempts=3,
context_length=8192, streaming_on=True,
context_length=16000,
return_step_meta=False, return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" "yaml" and output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
max_tokens=4000, # max output tokens max_tokens=16000, # max output tokens
saved_state_path="agent_00.json", interactive = True,
interactive=False, stopping_token="<DONE>",
execute_tool=True,
) )
agent.run( agent.run(

@ -0,0 +1,387 @@
from typing import List, Dict, Optional, Union, Any
from dataclasses import dataclass
from pathlib import Path
import numpy as np
from scipy.sparse import csr_matrix
from sklearn.cluster import AgglomerativeClustering
from sentence_transformers import SentenceTransformer
import faiss
import pickle
import time
from loguru import logger
from concurrent.futures import ThreadPoolExecutor
import threading
import uuid
@dataclass
class Document:
"""Represents a document in the HQD-RAG system.
Attributes:
id (str): Unique identifier for the document
content (str): Raw text content of the document
embedding (Optional[np.ndarray]): Quantum-inspired embedding vector
cluster_id (Optional[int]): ID of the cluster this document belongs to
"""
id: str
content: str
embedding: Optional[np.ndarray] = None
cluster_id: Optional[int] = None
class HQDRAG:
"""
Hierarchical Quantum-Inspired Distributed RAG (HQD-RAG) System
A production-grade implementation of the HQD-RAG algorithm for ultra-fast
and reliable document retrieval. Uses quantum-inspired embeddings and
hierarchical clustering for efficient search.
Attributes:
embedding_dim (int): Dimension of the quantum-inspired embeddings
num_clusters (int): Number of hierarchical clusters
similarity_threshold (float): Threshold for quantum similarity matching
reliability_threshold (float): Threshold for reliability verification
"""
def __init__(
self,
embedding_dim: int = 768,
num_clusters: int = 128,
similarity_threshold: float = 0.75,
reliability_threshold: float = 0.85,
model_name: str = "all-MiniLM-L6-v2"
):
"""Initialize the HQD-RAG system.
Args:
embedding_dim: Dimension of document embeddings
num_clusters: Number of clusters for hierarchical organization
similarity_threshold: Minimum similarity score for retrieval
reliability_threshold: Minimum reliability score for verification
model_name: Name of the sentence transformer model to use
"""
logger.info(f"Initializing HQD-RAG with {embedding_dim} dimensions")
self.embedding_dim = embedding_dim
self.num_clusters = num_clusters
self.similarity_threshold = similarity_threshold
self.reliability_threshold = reliability_threshold
# Initialize components
self.documents: Dict[str, Document] = {}
self.encoder = SentenceTransformer(model_name)
self.index = faiss.IndexFlatIP(embedding_dim) # Inner product index
self.clustering = AgglomerativeClustering(
n_clusters=num_clusters,
metric='euclidean',
linkage='ward'
)
# Thread safety
self._lock = threading.Lock()
self._executor = ThreadPoolExecutor(max_workers=4)
logger.info("HQD-RAG system initialized successfully")
def _compute_quantum_embedding(self, text: str) -> np.ndarray:
"""Compute quantum-inspired embedding for text.
Args:
text: Input text to embed
Returns:
Quantum-inspired embedding vector
"""
# Get base embedding
base_embedding = self.encoder.encode([text])[0]
# Apply quantum-inspired transformation
# Simulate superposition by adding phase components
phase = np.exp(2j * np.pi * np.random.random(self.embedding_dim))
quantum_embedding = base_embedding * phase
# Normalize to unit length
return quantum_embedding / np.linalg.norm(quantum_embedding)
def _verify_reliability(self, doc: Document, query_embedding: np.ndarray) -> float:
"""Verify the reliability of a document match.
Args:
doc: Document to verify
query_embedding: Query embedding vector
Returns:
Reliability score between 0 and 1
"""
if doc.embedding is None:
return 0.0
# Compute consistency score
consistency = np.abs(np.dot(doc.embedding, query_embedding))
# Add quantum noise resistance check
noise = np.random.normal(0, 0.1, self.embedding_dim)
noisy_query = query_embedding + noise
noisy_query = noisy_query / np.linalg.norm(noisy_query)
noise_resistance = np.abs(np.dot(doc.embedding, noisy_query))
return (consistency + noise_resistance) / 2
def add(self, content: str, doc_id: Optional[str] = None) -> str:
"""Add a document to the system.
Args:
content: Document text content
doc_id: Optional custom document ID
Returns:
Document ID
"""
doc_id = doc_id or str(uuid.uuid4())
with self._lock:
try:
# Compute embedding
embedding = self._compute_quantum_embedding(content)
# Create document
doc = Document(
id=doc_id,
content=content,
embedding=embedding
)
# Add to storage
self.documents[doc_id] = doc
self.index.add(embedding.reshape(1, -1))
# Update clustering
self._update_clusters()
logger.info(f"Successfully added document {doc_id}")
return doc_id
except Exception as e:
logger.error(f"Error adding document: {str(e)}")
raise
def query(
self,
query: str,
k: int = 5,
return_scores: bool = False
) -> Union[List[str], List[tuple[str, float]]]:
"""Query the system for relevant documents.
Args:
query: Query text
k: Number of results to return
return_scores: Whether to return similarity scores
Returns:
List of document IDs or (document ID, score) tuples
"""
try:
# Compute query embedding
query_embedding = self._compute_quantum_embedding(query)
# Search index
scores, indices = self.index.search(
query_embedding.reshape(1, -1),
k * 2 # Get extra results for reliability filtering
)
results = []
for score, idx in zip(scores[0], indices[0]):
# Get document
doc_id = list(self.documents.keys())[idx]
doc = self.documents[doc_id]
# Verify reliability
reliability = self._verify_reliability(doc, query_embedding)
if reliability >= self.reliability_threshold:
results.append((doc_id, float(score)))
if len(results) >= k:
break
logger.info(f"Query returned {len(results)} results")
if return_scores:
return results
return [doc_id for doc_id, _ in results]
except Exception as e:
logger.error(f"Error processing query: {str(e)}")
raise
def update(self, doc_id: str, new_content: str) -> None:
"""Update an existing document.
Args:
doc_id: ID of document to update
new_content: New document content
"""
with self._lock:
try:
if doc_id not in self.documents:
raise KeyError(f"Document {doc_id} not found")
# Remove old embedding
old_doc = self.documents[doc_id]
if old_doc.embedding is not None:
self.index.remove_ids(np.array([list(self.documents.keys()).index(doc_id)]))
# Compute new embedding
new_embedding = self._compute_quantum_embedding(new_content)
# Update document
self.documents[doc_id] = Document(
id=doc_id,
content=new_content,
embedding=new_embedding
)
# Add new embedding
self.index.add(new_embedding.reshape(1, -1))
# Update clustering
self._update_clusters()
logger.info(f"Successfully updated document {doc_id}")
except Exception as e:
logger.error(f"Error updating document: {str(e)}")
raise
def delete(self, doc_id: str) -> None:
"""Delete a document from the system.
Args:
doc_id: ID of document to delete
"""
with self._lock:
try:
if doc_id not in self.documents:
raise KeyError(f"Document {doc_id} not found")
# Remove from index
idx = list(self.documents.keys()).index(doc_id)
self.index.remove_ids(np.array([idx]))
# Remove from storage
del self.documents[doc_id]
# Update clustering
self._update_clusters()
logger.info(f"Successfully deleted document {doc_id}")
except Exception as e:
logger.error(f"Error deleting document: {str(e)}")
raise
def _update_clusters(self) -> None:
"""Update hierarchical document clusters."""
if len(self.documents) < 2:
return
# Get all embeddings
embeddings = np.vstack([
doc.embedding for doc in self.documents.values()
if doc.embedding is not None
])
# Update clustering
clusters = self.clustering.fit_predict(embeddings)
# Assign cluster IDs
for doc, cluster_id in zip(self.documents.values(), clusters):
doc.cluster_id = int(cluster_id)
def save(self, path: Union[str, Path]) -> None:
"""Save the system state to disk.
Args:
path: Path to save directory
"""
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
try:
# Save documents
with open(path / "documents.pkl", "wb") as f:
pickle.dump(self.documents, f)
# Save index
faiss.write_index(self.index, str(path / "index.faiss"))
logger.info(f"Successfully saved system state to {path}")
except Exception as e:
logger.error(f"Error saving system state: {str(e)}")
raise
def load(self, path: Union[str, Path]) -> None:
"""Load the system state from disk.
Args:
path: Path to save directory
"""
path = Path(path)
try:
# Load documents
with open(path / "documents.pkl", "rb") as f:
self.documents = pickle.load(f)
# Load index
self.index = faiss.read_index(str(path / "index.faiss"))
logger.info(f"Successfully loaded system state from {path}")
except Exception as e:
logger.error(f"Error loading system state: {str(e)}")
raise
# Example usage:
if __name__ == "__main__":
# Configure logging
logger.add(
"hqd_rag.log",
rotation="1 day",
retention="1 week",
level="INFO"
)
# Initialize system
rag = HQDRAG()
# Add some documents
doc_ids = []
docs = [
"The quick brown fox jumps over the lazy dog",
"Machine learning is a subset of artificial intelligence",
"Python is a popular programming language"
]
for doc in docs:
doc_id = rag.add(doc)
doc_ids.append(doc_id)
# Query
results = rag.query("What is machine learning?", return_scores=True)
print("Query results:", results)
# # Update a document
# rag.update(doc_ids[0], "The fast brown fox jumps over the sleepy dog")
# # Delete a document
# rag.delete(doc_ids[-1])
# # Save state
# rag.save("hqd_rag_state")

@ -5,8 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "6.7.5" version = "6.5.7"
description = "Swarms - TGSC" description = "Swarms - TGSC"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]
@ -64,35 +63,23 @@ asyncio = ">=3.4.3,<4.0"
toml = "*" toml = "*"
pypdf = "5.1.0" pypdf = "5.1.0"
loguru = "*" loguru = "*"
<<<<<<< HEAD pydantic = "2.8.2"
pydantic = "*"
=======
pydantic = ">=2.8.2<3.0"
>>>>>>> 66cb7a58 ([6.5.7])
tenacity = "*" tenacity = "*"
psutil = "*" psutil = "*"
sentry-sdk = "*" sentry-sdk = {version = "*", extras = ["http"]} # Updated here
python-dotenv = "*" python-dotenv = "*"
PyYAML = "*" PyYAML = "*"
docstring_parser = "0.16" # TODO: docstring_parser = "0.16"
tiktoken = "*" tiktoken = "*"
networkx = "*" networkx = "*"
aiofiles = "*" aiofiles = "*"
<<<<<<< HEAD
=======
clusterops = "*" clusterops = "*"
>>>>>>> 66cb7a58 ([6.5.7])
# chromadb = "*" # chromadb = "*"
reportlab = "*" reportlab = "*"
doc-master = "*" doc-master = "*"
rich = "*" rich = "*"
# sentence-transformers = "*" # sentence-transformers = "*"
swarm-models = "*" swarm-models = "*"
<<<<<<< HEAD
termcolor = "*"
clusterops = { git = "https://github.com/patrickbdevaney/clusterops.git", branch = "main" }
=======
>>>>>>> 66cb7a58 ([6.5.7])
# [tool.poetry.extras] # [tool.poetry.extras]
@ -124,7 +111,7 @@ swarms = "swarms.cli.main:main"
[tool.poetry.group.lint.dependencies] [tool.poetry.group.lint.dependencies]
black = ">=23.1,<25.0" black = ">=23.1,<25.0"
ruff = ">=0.5.1,<0.8.4" ruff = ">=0.5.1,<0.8.3"
types-toml = "^0.10.8.1" types-toml = "^0.10.8.1"
types-pytz = ">=2023.3,<2025.0" types-pytz = ">=2023.3,<2025.0"
types-chardet = "^5.0.4.6" types-chardet = "^5.0.4.6"

@ -1,433 +0,0 @@
import asyncio
import json
from dataclasses import asdict, dataclass
from datetime import datetime
from typing import Dict, List, Optional, Set
import aiohttp
import matplotlib.pyplot as plt
import networkx as nx
import websockets
from loguru import logger
from swarms import Agent
TREND_AGENT_PROMPT = """You are a specialized blockchain trend analysis agent. Your role:
1. Analyze transaction patterns in Solana blockchain data
2. Identify volume trends, price movements, and temporal patterns
3. Focus on whale movements and their market impact
4. Format findings in clear, structured JSON
5. Include confidence scores for each insight
6. Flag unusual patterns or anomalies
7. Provide historical context for significant movements
Output format:
{
"trends": [
{"pattern": str, "confidence": float, "impact": str}
],
"whale_activity": {...},
"temporal_analysis": {...}
}"""
RISK_AGENT_PROMPT = """You are a blockchain risk assessment specialist. Your tasks:
1. Identify suspicious transaction patterns
2. Monitor for known exploit signatures
3. Assess wallet clustering and relationship patterns
4. Evaluate transaction velocity and size anomalies
5. Check for bridge-related risks
6. Monitor smart contract interactions
7. Flag potential wash trading
Output format:
{
"risk_score": float,
"flags": [...],
"recommendations": [...]
}"""
SUMMARY_AGENT_PROMPT = """You are a blockchain data synthesis expert. Your responsibilities:
1. Combine insights from trend and risk analyses
2. Prioritize actionable intelligence
3. Highlight critical patterns
4. Generate executive summaries
5. Provide market context
6. Make predictions with confidence intervals
7. Suggest trading strategies based on data
Output format:
{
"key_insights": [...],
"market_impact": str,
"recommendations": {...}
}"""
@dataclass
class Transaction:
signature: str
timestamp: datetime
amount: float
from_address: str
to_address: str
class SolanaRPC:
def __init__(
self, endpoint="https://api.mainnet-beta.solana.com"
):
self.endpoint = endpoint
self.session = None
async def get_signatures(self, address: str) -> List[Dict]:
if not self.session:
self.session = aiohttp.ClientSession()
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "getSignaturesForAddress",
"params": [address, {"limit": 100}],
}
async with self.session.post(
self.endpoint, json=payload
) as response:
result = await response.json()
return result.get("result", [])
async def get_transaction(self, signature: str) -> Dict:
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "getTransaction",
"params": [
signature,
{
"encoding": "json",
"maxSupportedTransactionVersion": 0,
},
],
}
async with self.session.post(
self.endpoint, json=payload
) as response:
result = await response.json()
return result.get("result", {})
class AlertSystem:
def __init__(self, email: str, threshold: float = 1000.0):
self.email = email
self.threshold = threshold
self.smtp_server = "smtp.gmail.com"
self.smtp_port = 587
async def check_and_alert(
self, transaction: Transaction, risk_score: float
):
if transaction.amount > self.threshold or risk_score > 0.8:
await self.send_alert(transaction, risk_score)
async def send_alert(
self, transaction: Transaction, risk_score: float
):
# msg = MIMEText(
# f"High-risk transaction detected:\n"
# f"Amount: {transaction.amount} SOL\n"
# f"Risk Score: {risk_score}\n"
# f"Signature: {transaction.signature}"
# )
logger.info(
f"Alert sent for transaction {transaction.signature}"
)
class WalletClusterAnalyzer:
def __init__(self):
self.graph = nx.Graph()
self.known_wallets: Set[str] = set()
def update_graph(self, transaction: Transaction):
self.graph.add_edge(
transaction.from_address,
transaction.to_address,
weight=transaction.amount,
)
self.known_wallets.add(transaction.from_address)
self.known_wallets.add(transaction.to_address)
def identify_clusters(self) -> Dict:
communities = nx.community.greedy_modularity_communities(
self.graph
)
return {
"clusters": [list(c) for c in communities],
"central_wallets": [
wallet
for wallet in self.known_wallets
if self.graph.degree[wallet] > 5
],
}
class TransactionVisualizer:
def __init__(self):
self.transaction_history = []
def add_transaction(self, transaction: Transaction):
self.transaction_history.append(asdict(transaction))
def generate_volume_chart(self) -> str:
volumes = [tx["amount"] for tx in self.transaction_history]
plt.figure(figsize=(12, 6))
plt.plot(volumes)
plt.title("Transaction Volume Over Time")
plt.savefig("volume_chart.png")
return "volume_chart.png"
def generate_network_graph(
self, wallet_analyzer: WalletClusterAnalyzer
) -> str:
plt.figure(figsize=(15, 15))
pos = nx.spring_layout(wallet_analyzer.graph)
nx.draw(
wallet_analyzer.graph,
pos,
node_size=1000,
node_color="lightblue",
with_labels=True,
)
plt.savefig("network_graph.png")
return "network_graph.png"
class SolanaMultiAgentAnalyzer:
def __init__(
self,
min_amount: float = 50.0,
websocket_url: str = "wss://api.mainnet-beta.solana.com",
alert_email: str = None,
):
self.rpc = SolanaRPC()
self.websocket_url = websocket_url
self.min_amount = min_amount
self.transactions = []
self.wallet_analyzer = WalletClusterAnalyzer()
self.visualizer = TransactionVisualizer()
self.alert_system = (
AlertSystem(alert_email) if alert_email else None
)
self.trend_agent = Agent(
agent_name="trend-analyzer",
system_prompt=TREND_AGENT_PROMPT,
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
self.risk_agent = Agent(
agent_name="risk-analyzer",
system_prompt=RISK_AGENT_PROMPT,
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
self.summary_agent = Agent(
agent_name="summary-agent",
system_prompt=SUMMARY_AGENT_PROMPT,
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
logger.add(
"solana_analysis.log", rotation="500 MB", level="INFO"
)
async def start_websocket_stream(self):
async with websockets.connect(
self.websocket_url
) as websocket:
subscribe_message = {
"jsonrpc": "2.0",
"id": 1,
"method": "programSubscribe",
"params": [
"11111111111111111111111111111111",
{"encoding": "json", "commitment": "confirmed"},
],
}
await websocket.send(json.dumps(subscribe_message))
while True:
try:
msg = await websocket.recv()
transaction = await self.parse_websocket_message(
msg
)
if (
transaction
and transaction.amount >= self.min_amount
):
await self.process_transaction(transaction)
except Exception as e:
logger.error(f"Websocket error: {e}")
await asyncio.sleep(5)
async def parse_websocket_message(
self, msg: str
) -> Optional[Transaction]:
try:
data = json.loads(msg)
if "params" in data and "result" in data["params"]:
tx_data = data["params"]["result"]
return Transaction(
signature=tx_data["signature"],
timestamp=datetime.fromtimestamp(
tx_data["blockTime"]
),
amount=float(
tx_data["meta"]["postBalances"][0]
- tx_data["meta"]["preBalances"][0]
)
/ 1e9,
from_address=tx_data["transaction"]["message"][
"accountKeys"
][0],
to_address=tx_data["transaction"]["message"][
"accountKeys"
][1],
)
except Exception as e:
logger.error(f"Error parsing websocket message: {e}")
return None
async def process_transaction(self, transaction: Transaction):
self.wallet_analyzer.update_graph(transaction)
self.visualizer.add_transaction(transaction)
risk_analysis = await self.risk_agent.run(
f"Analyze risk for transaction: {json.dumps(asdict(transaction))}"
)
if self.alert_system:
await self.alert_system.check_and_alert(
transaction, risk_analysis.get("risk_score", 0)
)
async def fetch_transactions(self) -> List[Transaction]:
try:
signatures = await self.rpc.get_signatures(
"11111111111111111111111111111111"
)
transactions = []
for sig_info in signatures:
tx_data = await self.rpc.get_transaction(
sig_info["signature"]
)
if not tx_data or "meta" not in tx_data:
continue
pre_balances = tx_data["meta"]["preBalances"]
post_balances = tx_data["meta"]["postBalances"]
amount = abs(pre_balances[0] - post_balances[0]) / 1e9
if amount >= self.min_amount:
tx = Transaction(
signature=sig_info["signature"],
timestamp=datetime.fromtimestamp(
tx_data["blockTime"]
),
amount=amount,
from_address=tx_data["transaction"][
"message"
]["accountKeys"][0],
to_address=tx_data["transaction"]["message"][
"accountKeys"
][1],
)
transactions.append(tx)
return transactions
except Exception as e:
logger.error(f"Error fetching transactions: {e}")
return []
async def analyze_transactions(
self, transactions: List[Transaction]
) -> Dict:
tx_data = [asdict(tx) for tx in transactions]
cluster_data = self.wallet_analyzer.identify_clusters()
trend_analysis = await self.trend_agent.run(
f"Analyze trends in: {json.dumps(tx_data)}"
)
print(trend_analysis)
risk_analysis = await self.risk_agent.run(
f"Analyze risks in: {json.dumps({'transactions': tx_data, 'clusters': cluster_data})}"
)
print(risk_analysis)
summary = await self.summary_agent.run(
f"Synthesize insights from: {trend_analysis}, {risk_analysis}"
)
print(summary)
volume_chart = self.visualizer.generate_volume_chart()
network_graph = self.visualizer.generate_network_graph(
self.wallet_analyzer
)
return {
"transactions": tx_data,
"trend_analysis": trend_analysis,
"risk_analysis": risk_analysis,
"cluster_analysis": cluster_data,
"summary": summary,
"visualizations": {
"volume_chart": volume_chart,
"network_graph": network_graph,
},
}
async def run_continuous_analysis(self):
logger.info("Starting continuous analysis")
asyncio.create_task(self.start_websocket_stream())
while True:
try:
transactions = await self.fetch_transactions()
if transactions:
analysis = await self.analyze_transactions(
transactions
)
timestamp = datetime.now().strftime(
"%Y%m%d_%H%M%S"
)
with open(f"analysis_{timestamp}.json", "w") as f:
json.dump(analysis, f, indent=2, default=str)
logger.info(
f"Analysis completed: analysis_{timestamp}.json"
)
await asyncio.sleep(60)
except Exception as e:
logger.error(f"Error in analysis loop: {e}")
await asyncio.sleep(60)
# Add to __main__:
if __name__ == "__main__":
logger.info("Starting Solana analyzer...")
analyzer = SolanaMultiAgentAnalyzer(alert_email="your@email.com")
try:
asyncio.run(analyzer.run_continuous_analysis())
except Exception as e:
logger.error(f"Critical error: {e}")

@ -0,0 +1,165 @@
from swarms import Agent, SwarmRouter
# Portfolio Analysis Specialist
portfolio_analyzer = Agent(
agent_name="Portfolio-Analysis-Specialist",
system_prompt="""You are an expert portfolio analyst specializing in fund analysis and selection. Your core competencies include:
- Comprehensive analysis of mutual funds, ETFs, and index funds
- Evaluation of fund performance metrics (expense ratios, tracking error, Sharpe ratio)
- Assessment of fund composition and strategy alignment
- Risk-adjusted return analysis
- Tax efficiency considerations
For each portfolio analysis:
1. Evaluate fund characteristics and performance metrics
2. Analyze expense ratios and fee structures
3. Assess historical performance and volatility
4. Compare funds within same category
5. Consider tax implications
6. Review fund manager track record and strategy consistency
Maintain focus on cost-efficiency and alignment with investment objectives.""",
model_name="gpt-4o",
max_loops=1,
saved_state_path="portfolio_analyzer.json",
user_name="investment_team",
retry_attempts=2,
context_length=200000,
output_type="string",
)
# Asset Allocation Strategist
allocation_strategist = Agent(
agent_name="Asset-Allocation-Strategist",
system_prompt="""You are a specialized asset allocation strategist focused on portfolio construction and optimization. Your expertise includes:
- Strategic and tactical asset allocation
- Risk tolerance assessment and portfolio matching
- Geographic and sector diversification
- Rebalancing strategy development
- Portfolio optimization using modern portfolio theory
For each allocation:
1. Analyze investor risk tolerance and objectives
2. Develop appropriate asset class weights
3. Select optimal fund combinations
4. Design rebalancing triggers and schedules
5. Consider tax-efficient fund placement
6. Account for correlation between assets
Focus on creating well-diversified portfolios aligned with client goals and risk tolerance.""",
model_name="gpt-4o",
max_loops=1,
saved_state_path="allocation_strategist.json",
user_name="investment_team",
retry_attempts=2,
context_length=200000,
output_type="string",
)
# Risk Management Specialist
risk_manager = Agent(
agent_name="Risk-Management-Specialist",
system_prompt="""You are a risk management specialist focused on portfolio risk assessment and mitigation. Your expertise covers:
- Portfolio risk metrics analysis
- Downside protection strategies
- Correlation analysis between funds
- Stress testing and scenario analysis
- Market condition impact assessment
For each portfolio:
1. Calculate key risk metrics (Beta, Standard Deviation, etc.)
2. Analyze correlation matrices
3. Perform stress tests under various scenarios
4. Evaluate liquidity risks
5. Assess concentration risks
6. Monitor factor exposures
Focus on maintaining appropriate risk levels while maximizing risk-adjusted returns.""",
model_name="gpt-4o",
max_loops=1,
saved_state_path="risk_manager.json",
user_name="investment_team",
retry_attempts=2,
context_length=200000,
output_type="string",
)
# Portfolio Implementation Specialist
implementation_specialist = Agent(
agent_name="Portfolio-Implementation-Specialist",
system_prompt="""You are a portfolio implementation specialist focused on efficient execution and maintenance. Your responsibilities include:
- Fund selection for specific asset class exposure
- Tax-efficient implementation strategies
- Portfolio rebalancing execution
- Trading cost analysis
- Cash flow management
For each implementation:
1. Select most efficient funds for desired exposure
2. Plan tax-efficient transitions
3. Design rebalancing schedule
4. Optimize trade execution
5. Manage cash positions
6. Monitor tracking error
Maintain focus on minimizing costs and maximizing tax efficiency during implementation.""",
model_name="gpt-4o",
max_loops=1,
saved_state_path="implementation_specialist.json",
user_name="investment_team",
retry_attempts=2,
context_length=200000,
output_type="string",
)
# Portfolio Monitoring Specialist
monitoring_specialist = Agent(
agent_name="Portfolio-Monitoring-Specialist",
system_prompt="""You are a portfolio monitoring specialist focused on ongoing portfolio oversight and optimization. Your expertise includes:
- Regular portfolio performance review
- Drift monitoring and rebalancing triggers
- Fund changes and replacements
- Tax loss harvesting opportunities
- Performance attribution analysis
For each review:
1. Track portfolio drift from targets
2. Monitor fund performance and changes
3. Identify tax loss harvesting opportunities
4. Analyze tracking error and expenses
5. Review risk metrics evolution
6. Generate performance attribution reports
Ensure continuous alignment with investment objectives while maintaining optimal portfolio efficiency.""",
model_name="gpt-4o",
max_loops=1,
saved_state_path="monitoring_specialist.json",
user_name="investment_team",
retry_attempts=2,
context_length=200000,
output_type="string",
)
# List of all agents for portfolio management
portfolio_agents = [
portfolio_analyzer,
allocation_strategist,
risk_manager,
implementation_specialist,
monitoring_specialist
]
# Router
router = SwarmRouter(
name = "etf-portfolio-management-swarm",
description="Creates and suggests an optimal portfolio",
agents = portfolio_agents,
swarm_type="SequentialWorkflow", # ConcurrentWorkflow
max_loops = 1,
)
router.run(
task = "I have 10,000$ and I want to create a porfolio based on energy, ai, and datacenter companies. high growth."
)

@ -27,24 +27,12 @@ def run_agent_by_name(
Returns: Returns:
The output of the task run by the agent. The output of the task run by the agent.
""" """
# Initialize the agent try:
agent = Agent( agent = Agent(
agent_name=name, agent_name=name,
system_prompt=system_prompt, system_prompt=system_prompt,
model_name=model_name, model_name=model_name,
max_loops=max_loops, max_loops=max_loops,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path=f"{name}.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
# return_step_meta=True,
# disable_print_every_step=True,
# output_type="json",
interactive=True,
) )
output = agent.run(task=task, img=img, *args, **kwargs) output = agent.run(task=task, img=img, *args, **kwargs)

@ -39,7 +39,8 @@ def capture_system_data() -> Dict[str, str]:
system_data["external_ip"] = requests.get( system_data["external_ip"] = requests.get(
"https://api.ipify.org" "https://api.ipify.org"
).text ).text
except Exception: except Exception as e:
logger.warning("Failed to retrieve external IP: {}", e)
system_data["external_ip"] = "N/A" system_data["external_ip"] = "N/A"
return system_data return system_data
@ -48,7 +49,9 @@ def capture_system_data() -> Dict[str, str]:
return {} return {}
def log_agent_data(data_dict: dict) -> dict | None: def log_agent_data(
data_dict: dict
) -> dict | None:
""" """
Logs agent data to the Swarms database with retry logic. Logs agent data to the Swarms database with retry logic.
@ -73,7 +76,25 @@ def log_agent_data(data_dict: dict) -> dict | None:
"Authorization": "Bearer sk-f24a13ed139f757d99cdd9cdcae710fccead92681606a97086d9711f69d44869", "Authorization": "Bearer sk-f24a13ed139f757d99cdd9cdcae710fccead92681606a97086d9711f69d44869",
} }
requests.post(url, json=data_dict, headers=headers, timeout=10) try:
# response.raise_for_status() response = requests.post(
url, json=data_dict, headers=headers, timeout=10
)
response.raise_for_status()
result = response.json()
return result
except requests.exceptions.Timeout:
logger.warning("Request timed out")
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error occurred: {e}")
if response.status_code == 401:
logger.error("Authentication failed - check API key")
except requests.exceptions.RequestException as e:
logger.error(f"Error logging agent data: {e}")
logger.error("Failed to log agent data")
return None return None

@ -1,17 +1,58 @@
import os import os
import uuid import uuid
from typing import Any, Dict
from loguru import logger from loguru import logger
import requests
from swarms.telemetry.sys_info import system_info
def log_agent_data(data: Any) -> Dict:
"""
Send data to the agent logging API endpoint.
Args:
data: Any data structure that can be JSON serialized
Returns:
Dict: The JSON response from the API
"""
try:
# Prepare the data payload
data_dict = {"data": data}
# API endpoint configuration
url = "https://swarms.world/api/get-agents/log-agents"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer sk-f24a13ed139f757d99cdd9cdcae710fccead92681606a97086d9711f69d44869",
}
# Send the request
response = requests.post(url, json=data_dict, headers=headers)
response.raise_for_status() # Raise an error for HTTP codes 4xx/5xx
# Return the JSON response
return response.json()
except Exception as e:
logger.error(f"Failed to log agent data: {e}")
return {"error": str(e)}
def initialize_logger(log_folder: str = "logs"): def initialize_logger(log_folder: str = "logs"):
"""
Initialize and configure the Loguru logger.
Args:
log_folder: The folder where logs will be stored.
Returns:
The configured Loguru logger.
"""
AGENT_WORKSPACE = "agent_workspace" AGENT_WORKSPACE = "agent_workspace"
# Check if WORKSPACE_DIR is set, if not, set it to AGENT_WORKSPACE # Check if WORKSPACE_DIR is set, if not, set it to AGENT_WORKSPACE
if "WORKSPACE_DIR" not in os.environ: if "WORKSPACE_DIR" not in os.environ:
os.environ["WORKSPACE_DIR"] = AGENT_WORKSPACE os.environ["WORKSPACE_DIR"] = AGENT_WORKSPACE
# Create a folder within the agent_workspace # Create the log folder within the workspace
log_folder_path = os.path.join( log_folder_path = os.path.join(
os.getenv("WORKSPACE_DIR"), log_folder os.getenv("WORKSPACE_DIR"), log_folder
) )
@ -24,6 +65,7 @@ def initialize_logger(log_folder: str = "logs"):
log_folder_path, f"{log_folder}_{uuid_for_log}.log" log_folder_path, f"{log_folder}_{uuid_for_log}.log"
) )
# Add a Loguru sink for file logging
logger.add( logger.add(
log_file_path, log_file_path,
level="INFO", level="INFO",
@ -31,7 +73,47 @@ def initialize_logger(log_folder: str = "logs"):
backtrace=True, backtrace=True,
diagnose=True, diagnose=True,
enqueue=True, enqueue=True,
retention="10 days", # retention="10 days",
# compression="zip", # compression="zip",
) )
# Add a Loguru sink to intercept all log messages and send them to `log_agent_data`
class AgentLogHandler:
def write(self, message):
if message.strip(): # Avoid sending empty messages
payload = {
"log": str(message.strip()),
"folder": log_folder,
"metadata": system_info(),
}
response = log_agent_data(payload)
logger.debug(f"Sent to API: {payload}, Response: {response}")
logger.add(AgentLogHandler(), level="INFO")
return logger return logger
# if __name__ == "__main__":
# # Initialize the logger
# logger = initialize_logger()
# # Generate test log messages
# logger.info("This is a test info log.")
# logger.warning("This is a test warning log.")
# logger.error("This is a test error log.")
# # Simulate agent data logging
# test_data = {
# "agent_name": "TestAgent",
# "task": "Example Task",
# "status": "Running",
# "details": {
# "runtime": "5s",
# "success": True
# }
# }
# log_agent_data(test_data)
# print("Test logging completed.")

@ -1,42 +0,0 @@
from swarms.structs.tree_swarm import ForestSwarm, Tree, TreeAgent
agents_tree1 = [
TreeAgent(
system_prompt="Stock Analysis Agent",
agent_name="Stock Analysis Agent",
),
TreeAgent(
system_prompt="Financial Planning Agent",
agent_name="Financial Planning Agent",
),
TreeAgent(
agent_name="Retirement Strategy Agent",
system_prompt="Retirement Strategy Agent",
),
]
agents_tree2 = [
TreeAgent(
system_prompt="Tax Filing Agent",
agent_name="Tax Filing Agent",
),
TreeAgent(
system_prompt="Investment Strategy Agent",
agent_name="Investment Strategy Agent",
),
TreeAgent(
system_prompt="ROTH IRA Agent", agent_name="ROTH IRA Agent"
),
]
# Create trees
tree1 = Tree(tree_name="Financial Tree", agents=agents_tree1)
tree2 = Tree(tree_name="Investment Tree", agents=agents_tree2)
# Create the ForestSwarm
multi_agent_structure = ForestSwarm(trees=[tree1, tree2])
# Run a task
task = "Our company is incorporated in delaware, how do we do our taxes for free?"
multi_agent_structure.run(task)

206
zpk.py

@ -1,206 +0,0 @@
from swarms import Agent
from loguru import logger
import random
import re
# Configure loguru
logger.add("zkp_log.log", rotation="500 KB", retention="10 days", level="INFO")
class ProverAgent:
"""
Prover Agent for Zero Knowledge Proof.
Responsibilities:
- Generate commitments based on a secret.
- Respond to challenges from the Verifier.
Attributes:
agent (Agent): Swarms agent instance.
p (int): The prime modulus.
g (int): The generator.
x (int): The Prover's secret.
"""
def __init__(self, p: int, g: int, secret: int):
self.p = p
self.g = g
self.x = secret # Prover's secret
self.agent = Agent(
agent_name="ProverAgent",
model_name="gpt-4o-mini",
max_loop=1,
interactive=False,
streaming_on=True,
system_prompt=(
"You are the Prover in a Zero Knowledge Proof (ZKP) system. "
"Your responsibilities are to generate commitments based on a secret value and "
"respond to challenges from the Verifier without revealing the secret. "
"Follow mathematical rules of modular arithmetic when performing computations."
),
)
logger.info("Initialized ProverAgent with p={}, g={}, secret={}", p, g, secret)
def generate_commitment(self) -> tuple[int, int]:
"""
Generates a random commitment for the proof.
Returns:
tuple[int, int]: The random value (r) and the commitment (t).
"""
r = random.randint(1, self.p - 2)
task = (
f"Compute the commitment t = g^r % p for g={self.g}, r={r}, p={self.p}. "
"Return only the numerical value of t as an integer."
)
t = self.agent.run(task=task)
t_value = self._extract_integer(t, "commitment")
logger.info("Prover generated commitment: r={}, t={}", r, t_value)
return r, t_value
def _extract_integer(self, response: str, label: str) -> int:
"""
Extracts an integer from the LLM response.
Args:
response (str): The response from the agent.
label (str): A label for logging purposes.
Returns:
int: The extracted integer value.
"""
try:
# Use regex to find the first integer in the response
match = re.search(r"\b\d+\b", response)
if match:
value = int(match.group(0))
return value
else:
raise ValueError(f"No integer found in {label} response: {response}")
except Exception as e:
logger.error("Failed to extract integer from {label} response: {response}")
raise ValueError(f"Invalid {label} response: {response}") from e
def respond_to_challenge(self, r: int, c: int) -> int:
"""
Computes the response to a challenge.
Args:
r (int): The random value used in the commitment.
c (int): The challenge issued by the Verifier.
Returns:
int: The response (z).
"""
task = f"Compute the response z = (r + c * x) % (p-1) for r={r}, c={c}, x={self.x}, p={self.p}."
z = self.agent.run(task=task)
logger.info("Prover responded to challenge: z={}", z)
return int(z)
class VerifierAgent:
"""
Verifier Agent for Zero Knowledge Proof.
Responsibilities:
- Issue challenges to the Prover.
- Verify the Prover's response.
Attributes:
agent (Agent): Swarms agent instance.
p (int): The prime modulus.
g (int): The generator.
y (int): The public value from the Prover.
"""
def __init__(self, p: int, g: int, y: int):
self.p = p
self.g = g
self.y = y # Public value
self.agent = Agent(
agent_name="VerifierAgent",
model_name="gpt-4o-mini",
max_loop=1,
interactive=False,
streaming_on=True,
system_prompt=(
"You are the Verifier in a Zero Knowledge Proof (ZKP) system. "
"Your responsibilities are to issue random challenges and verify the Prover's response. "
"Use modular arithmetic to check if the proof satisfies g^z % p == (t * y^c) % p."
),
)
logger.info("Initialized VerifierAgent with p={}, g={}, y={}", p, g, y)
def issue_challenge(self) -> int:
"""
Issues a random challenge to the Prover.
Returns:
int: The challenge value (c).
"""
c = random.randint(1, 10)
logger.info("Verifier issued challenge: c={}", c)
return c
def verify_proof(self, t: int, z: int, c: int) -> bool:
"""
Verifies the Prover's response.
Args:
t (int): The commitment from the Prover.
z (int): The response from the Prover.
c (int): The challenge issued to the Prover.
Returns:
bool: True if the proof is valid, False otherwise.
"""
task = f"Verify if g^z % p == (t * y^c) % p for g={self.g}, z={z}, p={self.p}, t={t}, y={self.y}, c={c}."
verification_result = self.agent.run(task=task)
is_valid = verification_result.strip().lower() == "true"
logger.info("Verifier checked proof: t={}, z={}, c={}, valid={}", t, z, c, is_valid)
return is_valid
class CoordinatorAgent:
"""
Coordinator for orchestrating the Zero Knowledge Proof protocol.
Responsibilities:
- Initialize parameters.
- Facilitate interaction between Prover and Verifier agents.
"""
def __init__(self, p: int, g: int, secret: int):
self.p = p
self.g = g
self.prover = ProverAgent(p, g, secret)
y = pow(g, secret, p) # Public value
self.verifier = VerifierAgent(p, g, y)
logger.info("Coordinator initialized with p={}, g={}, secret={}", p, g, secret)
def orchestrate(self) -> bool:
"""
Orchestrates the Zero Knowledge Proof protocol.
Returns:
bool: True if the proof is valid, False otherwise.
"""
logger.info("Starting ZKP protocol orchestration.")
r, t = self.prover.generate_commitment()
c = self.verifier.issue_challenge()
z = self.prover.respond_to_challenge(r, c)
is_valid = self.verifier.verify_proof(t, z, c)
logger.info("ZKP protocol completed. Valid proof: {}", is_valid)
return is_valid
if __name__ == "__main__":
# Example parameters
p = 23 # Prime number
g = 5 # Generator
secret = 7 # Prover's secret
# Initialize the Coordinator and run the protocol
coordinator = CoordinatorAgent(p, g, secret)
result = coordinator.orchestrate()
print(f"Zero Knowledge Proof Verification Result: {'Valid' if result else 'Invalid'}")
Loading…
Cancel
Save