From 117c9bd559903f9a30d6e9d969d703c2b1754959 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Sat, 7 Dec 2024 11:37:02 -0800 Subject: [PATCH] [CLEANUP] --- Dockerfile | 2 - example.py | 27 +- fastrag.py | 387 ++++++++++++++++ .../fund_manager_forest.py | 147 ++++++ .../medical_forest_swarm.py | 150 ++++++ .../forest_swarm_examples/tree_swarm_test.py | 0 pyproject.toml | 2 +- sol_agent.py | 433 ------------------ swarm_router_example.py | 165 +++++++ swarms/agents/__init__.py | 2 +- swarms/cli/create_agent.py | 94 ++-- .../stopping_conditions.py | 0 swarms/telemetry/capture_sys_data.py | 2 +- swarms/utils/loguru_logger.py | 86 +++- zpk.py | 206 --------- 15 files changed, 978 insertions(+), 725 deletions(-) create mode 100644 fastrag.py create mode 100644 new_features_examples/forest_swarm_examples/fund_manager_forest.py create mode 100644 new_features_examples/forest_swarm_examples/medical_forest_swarm.py rename tree_swarm_test.py => new_features_examples/forest_swarm_examples/tree_swarm_test.py (100%) delete mode 100644 sol_agent.py create mode 100644 swarm_router_example.py rename swarms/{agents => structs}/stopping_conditions.py (100%) delete mode 100644 zpk.py diff --git a/Dockerfile b/Dockerfile index f7d0175f..08c42d55 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,3 @@ - -# ================================== # Use an official Python runtime as a parent image FROM python:3.11-slim diff --git a/example.py b/example.py index 4f2d2f3f..6ba8a46c 100644 --- a/example.py +++ b/example.py @@ -6,26 +6,25 @@ from swarms.prompts.finance_agent_sys_prompt import ( # Initialize the agent agent = Agent( agent_name="Financial-Analysis-Agent", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT, - model_name="gpt-4o-mini", - max_loops=1, - autosave=True, - dashboard=False, - verbose=True, + agent_description = "Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT + "Output the token when you're done creating a portfolio of etfs, index, funds, and more for AI", + model_name="gpt-4o", # Use any model from litellm + max_loops="auto", dynamic_temperature_enabled=True, - saved_state_path="finance_agent.json", - user_name="swarms_corp", - retry_attempts=1, + user_name="Kye", + retry_attempts=3, streaming_on=True, - context_length=200000, + context_length=16000, return_step_meta=False, - output_type="str", # "json", "dict", "csv" OR "string" soon "yaml" and + output_type="str", # "json", "dict", "csv" OR "string" "yaml" and auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task - max_tokens=8000, + max_tokens=16000, # max output tokens + interactive = True, + stopping_token="", + execute_tool=True, ) - agent.run( - "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria. Create a report on this question.", + "Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.", all_cores=True, ) diff --git a/fastrag.py b/fastrag.py new file mode 100644 index 00000000..20839bf1 --- /dev/null +++ b/fastrag.py @@ -0,0 +1,387 @@ +from typing import List, Dict, Optional, Union, Any +from dataclasses import dataclass +from pathlib import Path +import numpy as np +from scipy.sparse import csr_matrix +from sklearn.cluster import AgglomerativeClustering +from sentence_transformers import SentenceTransformer +import faiss +import pickle +import time +from loguru import logger +from concurrent.futures import ThreadPoolExecutor +import threading +import uuid + +@dataclass +class Document: + """Represents a document in the HQD-RAG system. + + Attributes: + id (str): Unique identifier for the document + content (str): Raw text content of the document + embedding (Optional[np.ndarray]): Quantum-inspired embedding vector + cluster_id (Optional[int]): ID of the cluster this document belongs to + """ + id: str + content: str + embedding: Optional[np.ndarray] = None + cluster_id: Optional[int] = None + +class HQDRAG: + """ + Hierarchical Quantum-Inspired Distributed RAG (HQD-RAG) System + + A production-grade implementation of the HQD-RAG algorithm for ultra-fast + and reliable document retrieval. Uses quantum-inspired embeddings and + hierarchical clustering for efficient search. + + Attributes: + embedding_dim (int): Dimension of the quantum-inspired embeddings + num_clusters (int): Number of hierarchical clusters + similarity_threshold (float): Threshold for quantum similarity matching + reliability_threshold (float): Threshold for reliability verification + """ + + def __init__( + self, + embedding_dim: int = 768, + num_clusters: int = 128, + similarity_threshold: float = 0.75, + reliability_threshold: float = 0.85, + model_name: str = "all-MiniLM-L6-v2" + ): + """Initialize the HQD-RAG system. + + Args: + embedding_dim: Dimension of document embeddings + num_clusters: Number of clusters for hierarchical organization + similarity_threshold: Minimum similarity score for retrieval + reliability_threshold: Minimum reliability score for verification + model_name: Name of the sentence transformer model to use + """ + logger.info(f"Initializing HQD-RAG with {embedding_dim} dimensions") + + self.embedding_dim = embedding_dim + self.num_clusters = num_clusters + self.similarity_threshold = similarity_threshold + self.reliability_threshold = reliability_threshold + + # Initialize components + self.documents: Dict[str, Document] = {} + self.encoder = SentenceTransformer(model_name) + self.index = faiss.IndexFlatIP(embedding_dim) # Inner product index + self.clustering = AgglomerativeClustering( + n_clusters=num_clusters, + metric='euclidean', + linkage='ward' + ) + + # Thread safety + self._lock = threading.Lock() + self._executor = ThreadPoolExecutor(max_workers=4) + + logger.info("HQD-RAG system initialized successfully") + + def _compute_quantum_embedding(self, text: str) -> np.ndarray: + """Compute quantum-inspired embedding for text. + + Args: + text: Input text to embed + + Returns: + Quantum-inspired embedding vector + """ + # Get base embedding + base_embedding = self.encoder.encode([text])[0] + + # Apply quantum-inspired transformation + # Simulate superposition by adding phase components + phase = np.exp(2j * np.pi * np.random.random(self.embedding_dim)) + quantum_embedding = base_embedding * phase + + # Normalize to unit length + return quantum_embedding / np.linalg.norm(quantum_embedding) + + def _verify_reliability(self, doc: Document, query_embedding: np.ndarray) -> float: + """Verify the reliability of a document match. + + Args: + doc: Document to verify + query_embedding: Query embedding vector + + Returns: + Reliability score between 0 and 1 + """ + if doc.embedding is None: + return 0.0 + + # Compute consistency score + consistency = np.abs(np.dot(doc.embedding, query_embedding)) + + # Add quantum noise resistance check + noise = np.random.normal(0, 0.1, self.embedding_dim) + noisy_query = query_embedding + noise + noisy_query = noisy_query / np.linalg.norm(noisy_query) + noise_resistance = np.abs(np.dot(doc.embedding, noisy_query)) + + return (consistency + noise_resistance) / 2 + + def add(self, content: str, doc_id: Optional[str] = None) -> str: + """Add a document to the system. + + Args: + content: Document text content + doc_id: Optional custom document ID + + Returns: + Document ID + """ + doc_id = doc_id or str(uuid.uuid4()) + + with self._lock: + try: + # Compute embedding + embedding = self._compute_quantum_embedding(content) + + # Create document + doc = Document( + id=doc_id, + content=content, + embedding=embedding + ) + + # Add to storage + self.documents[doc_id] = doc + self.index.add(embedding.reshape(1, -1)) + + # Update clustering + self._update_clusters() + + logger.info(f"Successfully added document {doc_id}") + return doc_id + + except Exception as e: + logger.error(f"Error adding document: {str(e)}") + raise + + def query( + self, + query: str, + k: int = 5, + return_scores: bool = False + ) -> Union[List[str], List[tuple[str, float]]]: + """Query the system for relevant documents. + + Args: + query: Query text + k: Number of results to return + return_scores: Whether to return similarity scores + + Returns: + List of document IDs or (document ID, score) tuples + """ + try: + # Compute query embedding + query_embedding = self._compute_quantum_embedding(query) + + # Search index + scores, indices = self.index.search( + query_embedding.reshape(1, -1), + k * 2 # Get extra results for reliability filtering + ) + + results = [] + for score, idx in zip(scores[0], indices[0]): + # Get document + doc_id = list(self.documents.keys())[idx] + doc = self.documents[doc_id] + + # Verify reliability + reliability = self._verify_reliability(doc, query_embedding) + + if reliability >= self.reliability_threshold: + results.append((doc_id, float(score))) + + if len(results) >= k: + break + + logger.info(f"Query returned {len(results)} results") + + if return_scores: + return results + return [doc_id for doc_id, _ in results] + + except Exception as e: + logger.error(f"Error processing query: {str(e)}") + raise + + def update(self, doc_id: str, new_content: str) -> None: + """Update an existing document. + + Args: + doc_id: ID of document to update + new_content: New document content + """ + with self._lock: + try: + if doc_id not in self.documents: + raise KeyError(f"Document {doc_id} not found") + + # Remove old embedding + old_doc = self.documents[doc_id] + if old_doc.embedding is not None: + self.index.remove_ids(np.array([list(self.documents.keys()).index(doc_id)])) + + # Compute new embedding + new_embedding = self._compute_quantum_embedding(new_content) + + # Update document + self.documents[doc_id] = Document( + id=doc_id, + content=new_content, + embedding=new_embedding + ) + + # Add new embedding + self.index.add(new_embedding.reshape(1, -1)) + + # Update clustering + self._update_clusters() + + logger.info(f"Successfully updated document {doc_id}") + + except Exception as e: + logger.error(f"Error updating document: {str(e)}") + raise + + def delete(self, doc_id: str) -> None: + """Delete a document from the system. + + Args: + doc_id: ID of document to delete + """ + with self._lock: + try: + if doc_id not in self.documents: + raise KeyError(f"Document {doc_id} not found") + + # Remove from index + idx = list(self.documents.keys()).index(doc_id) + self.index.remove_ids(np.array([idx])) + + # Remove from storage + del self.documents[doc_id] + + # Update clustering + self._update_clusters() + + logger.info(f"Successfully deleted document {doc_id}") + + except Exception as e: + logger.error(f"Error deleting document: {str(e)}") + raise + + def _update_clusters(self) -> None: + """Update hierarchical document clusters.""" + if len(self.documents) < 2: + return + + # Get all embeddings + embeddings = np.vstack([ + doc.embedding for doc in self.documents.values() + if doc.embedding is not None + ]) + + # Update clustering + clusters = self.clustering.fit_predict(embeddings) + + # Assign cluster IDs + for doc, cluster_id in zip(self.documents.values(), clusters): + doc.cluster_id = int(cluster_id) + + def save(self, path: Union[str, Path]) -> None: + """Save the system state to disk. + + Args: + path: Path to save directory + """ + path = Path(path) + path.mkdir(parents=True, exist_ok=True) + + try: + # Save documents + with open(path / "documents.pkl", "wb") as f: + pickle.dump(self.documents, f) + + # Save index + faiss.write_index(self.index, str(path / "index.faiss")) + + logger.info(f"Successfully saved system state to {path}") + + except Exception as e: + logger.error(f"Error saving system state: {str(e)}") + raise + + def load(self, path: Union[str, Path]) -> None: + """Load the system state from disk. + + Args: + path: Path to save directory + """ + path = Path(path) + + try: + # Load documents + with open(path / "documents.pkl", "rb") as f: + self.documents = pickle.load(f) + + # Load index + self.index = faiss.read_index(str(path / "index.faiss")) + + logger.info(f"Successfully loaded system state from {path}") + + except Exception as e: + logger.error(f"Error loading system state: {str(e)}") + raise + +# Example usage: +if __name__ == "__main__": + # Configure logging + logger.add( + "hqd_rag.log", + rotation="1 day", + retention="1 week", + level="INFO" + ) + + # Initialize system + rag = HQDRAG() + + # Add some documents + doc_ids = [] + docs = [ + "The quick brown fox jumps over the lazy dog", + "Machine learning is a subset of artificial intelligence", + "Python is a popular programming language" + ] + + for doc in docs: + doc_id = rag.add(doc) + doc_ids.append(doc_id) + + # Query + results = rag.query("What is machine learning?", return_scores=True) + print("Query results:", results) + + # # Update a document + # rag.update(doc_ids[0], "The fast brown fox jumps over the sleepy dog") + + # # Delete a document + # rag.delete(doc_ids[-1]) + + # # Save state + # rag.save("hqd_rag_state") + + + \ No newline at end of file diff --git a/new_features_examples/forest_swarm_examples/fund_manager_forest.py b/new_features_examples/forest_swarm_examples/fund_manager_forest.py new file mode 100644 index 00000000..afce82cf --- /dev/null +++ b/new_features_examples/forest_swarm_examples/fund_manager_forest.py @@ -0,0 +1,147 @@ +from swarms.structs.tree_swarm import ForestSwarm, Tree, TreeAgent + +# Fund Analysis Tree +fund_agents = [ + TreeAgent( + system_prompt="""Mutual Fund Analysis Agent: + - Analyze mutual fund performance metrics and ratios + - Evaluate fund manager track records and strategy consistency + - Compare expense ratios and fee structures + - Assess fund holdings and sector allocations + - Monitor fund inflows/outflows and size implications + - Analyze risk-adjusted returns (Sharpe, Sortino ratios) + - Consider tax efficiency and distribution history + - Track style drift and benchmark adherence + Knowledge base: Mutual fund operations, portfolio management, fee structures + Output format: Fund analysis report with recommendations""", + agent_name="Mutual Fund Analyst", + ), + TreeAgent( + system_prompt="""Index Fund Specialist Agent: + - Evaluate index tracking accuracy and tracking error + - Compare different index methodologies + - Analyze index fund costs and tax efficiency + - Monitor index rebalancing impacts + - Assess market capitalization weightings + - Compare similar indices and their differences + - Evaluate smart beta and factor strategies + Knowledge base: Index construction, passive investing, market efficiency + Output format: Index fund comparison and selection recommendations""", + agent_name="Index Fund Specialist", + ), + TreeAgent( + system_prompt="""ETF Strategy Agent: + - Analyze ETF liquidity and trading volumes + - Evaluate creation/redemption mechanisms + - Compare ETF spreads and premium/discount patterns + - Assess underlying asset liquidity + - Monitor authorized participant activity + - Analyze securities lending revenue + - Compare similar ETFs and their structures + Knowledge base: ETF mechanics, trading strategies, market making + Output format: ETF analysis with trading recommendations""", + agent_name="ETF Strategist", + ), +] + +# Sector Specialist Tree +sector_agents = [ + TreeAgent( + system_prompt="""Energy Sector Analysis Agent: + - Track global energy market trends + - Analyze traditional and renewable energy companies + - Monitor regulatory changes and policy impacts + - Evaluate commodity price influences + - Assess geopolitical risk factors + - Track technological disruption in energy + - Analyze energy infrastructure investments + Knowledge base: Energy markets, commodities, regulatory environment + Output format: Energy sector analysis with investment opportunities""", + agent_name="Energy Sector Analyst", + ), + TreeAgent( + system_prompt="""AI and Technology Specialist Agent: + - Research AI company fundamentals and growth metrics + - Evaluate AI technology adoption trends + - Analyze AI chip manufacturers and supply chains + - Monitor AI software and service providers + - Track AI patent filings and R&D investments + - Assess competitive positioning in AI market + - Consider regulatory risks and ethical factors + Knowledge base: AI technology, semiconductor industry, tech sector dynamics + Output format: AI sector analysis with investment recommendations""", + agent_name="AI Technology Analyst", + ), + TreeAgent( + system_prompt="""Market Infrastructure Agent: + - Monitor trading platform stability + - Analyze market maker activity + - Track exchange system updates + - Evaluate clearing house operations + - Monitor settlement processes + - Assess cybersecurity measures + - Track regulatory compliance updates + Knowledge base: Market structure, trading systems, regulatory requirements + Output format: Market infrastructure assessment and risk analysis""", + agent_name="Infrastructure Monitor", + ), +] + +# Trading Strategy Tree +strategy_agents = [ + TreeAgent( + system_prompt="""Portfolio Strategy Agent: + - Develop asset allocation strategies + - Implement portfolio rebalancing rules + - Monitor portfolio risk metrics + - Optimize position sizing + - Calculate portfolio correlation matrices + - Implement tax-loss harvesting strategies + - Track portfolio performance attribution + Knowledge base: Portfolio theory, risk management, asset allocation + Output format: Portfolio strategy recommendations with implementation plan""", + agent_name="Portfolio Strategist", + ), + TreeAgent( + system_prompt="""Technical Analysis Agent: + - Analyze price patterns and trends + - Calculate technical indicators + - Identify support/resistance levels + - Monitor volume and momentum indicators + - Track market breadth metrics + - Analyze intermarket relationships + - Generate trading signals + Knowledge base: Technical analysis, chart patterns, market indicators + Output format: Technical analysis report with trade signals""", + agent_name="Technical Analyst", + ), + TreeAgent( + system_prompt="""Risk Management Agent: + - Calculate position-level risk metrics + - Monitor portfolio VaR and stress tests + - Track correlation changes + - Implement stop-loss strategies + - Monitor margin requirements + - Assess liquidity risk factors + - Generate risk alerts and warnings + Knowledge base: Risk metrics, position sizing, risk modeling + Output format: Risk assessment report with mitigation recommendations""", + agent_name="Risk Manager", + ), +] + +# Create trees +fund_tree = Tree(tree_name="Fund Analysis", agents=fund_agents) +sector_tree = Tree(tree_name="Sector Analysis", agents=sector_agents) +strategy_tree = Tree( + tree_name="Trading Strategy", agents=strategy_agents +) + +# Create the ForestSwarm +trading_forest = ForestSwarm( + trees=[fund_tree, sector_tree, strategy_tree] +) + +# Example usage +task = "Analyze current opportunities in AI sector ETFs considering market conditions and provide a risk-adjusted portfolio allocation strategy. Add in the names of the best AI etfs that are reliable and align with this strategy and also include where to purchase the etfs" +result = trading_forest.run(task) diff --git a/new_features_examples/forest_swarm_examples/medical_forest_swarm.py b/new_features_examples/forest_swarm_examples/medical_forest_swarm.py new file mode 100644 index 00000000..21e35acb --- /dev/null +++ b/new_features_examples/forest_swarm_examples/medical_forest_swarm.py @@ -0,0 +1,150 @@ +from swarms.structs.tree_swarm import ForestSwarm, Tree, TreeAgent + +# Diagnostic Specialists Tree +diagnostic_agents = [ + TreeAgent( + system_prompt="""Primary Care Diagnostic Agent: + - Conduct initial patient assessment and triage + - Analyze patient symptoms, vital signs, and medical history + - Identify red flags and emergency conditions + - Coordinate with specialist agents for complex cases + - Provide preliminary diagnosis recommendations + - Consider common conditions and their presentations + - Factor in patient demographics and risk factors + Medical knowledge base: General medicine, common conditions, preventive care + Output format: Structured assessment with recommended next steps""", + agent_name="Primary Diagnostician", + ), + TreeAgent( + system_prompt="""Laboratory Analysis Agent: + - Interpret complex laboratory results + - Recommend appropriate test panels based on symptoms + - Analyze blood work, urinalysis, and other diagnostic tests + - Identify abnormal results and their clinical significance + - Suggest follow-up tests when needed + - Consider test accuracy and false positive/negative rates + - Integrate lab results with clinical presentation + Medical knowledge base: Clinical pathology, laboratory medicine, test interpretation + Output format: Detailed lab analysis with clinical correlations""", + agent_name="Lab Analyst", + ), + TreeAgent( + system_prompt="""Medical Imaging Specialist Agent: + - Analyze radiological images (X-rays, CT, MRI, ultrasound) + - Identify anatomical abnormalities and pathological changes + - Recommend appropriate imaging studies + - Correlate imaging findings with clinical symptoms + - Provide differential diagnoses based on imaging + - Consider radiation exposure and cost-effectiveness + - Suggest follow-up imaging when needed + Medical knowledge base: Radiology, anatomy, pathological imaging patterns + Output format: Structured imaging report with findings and recommendations""", + agent_name="Imaging Specialist", + ), +] + +# Treatment Specialists Tree +treatment_agents = [ + TreeAgent( + system_prompt="""Treatment Planning Agent: + - Develop comprehensive treatment plans based on diagnosis + - Consider evidence-based treatment guidelines + - Account for patient factors (age, comorbidities, preferences) + - Evaluate treatment risks and benefits + - Consider cost-effectiveness and accessibility + - Plan for treatment monitoring and adjustment + - Coordinate multi-modal treatment approaches + Medical knowledge base: Clinical guidelines, treatment protocols, medical management + Output format: Detailed treatment plan with rationale and monitoring strategy""", + agent_name="Treatment Planner", + ), + TreeAgent( + system_prompt="""Medication Management Agent: + - Recommend appropriate medications and dosing + - Check for drug interactions and contraindications + - Consider patient-specific factors affecting medication choice + - Provide medication administration guidelines + - Monitor for adverse effects and therapeutic response + - Suggest alternatives for contraindicated medications + - Plan medication tapering or adjustments + Medical knowledge base: Pharmacology, drug interactions, clinical pharmacotherapy + Output format: Medication plan with monitoring parameters""", + agent_name="Medication Manager", + ), + TreeAgent( + system_prompt="""Specialist Intervention Agent: + - Recommend specialized procedures and interventions + - Evaluate need for surgical vs. non-surgical approaches + - Consider procedural risks and benefits + - Provide pre- and post-procedure care guidelines + - Coordinate with other specialists + - Plan follow-up care and monitoring + - Handle complex cases requiring multiple interventions + Medical knowledge base: Surgical procedures, specialized interventions, perioperative care + Output format: Intervention plan with risk assessment and care protocol""", + agent_name="Intervention Specialist", + ), +] + +# Follow-up and Monitoring Tree +followup_agents = [ + TreeAgent( + system_prompt="""Recovery Monitoring Agent: + - Track patient progress and treatment response + - Identify complications or adverse effects early + - Adjust treatment plans based on response + - Coordinate follow-up appointments and tests + - Monitor vital signs and symptoms + - Evaluate treatment adherence and barriers + - Recommend lifestyle modifications + Medical knowledge base: Recovery patterns, complications, monitoring protocols + Output format: Progress report with recommendations""", + agent_name="Recovery Monitor", + ), + TreeAgent( + system_prompt="""Preventive Care Agent: + - Develop preventive care strategies + - Recommend appropriate screening tests + - Provide lifestyle and dietary guidance + - Monitor risk factors for disease progression + - Coordinate vaccination schedules + - Suggest health maintenance activities + - Plan long-term health monitoring + Medical knowledge base: Preventive medicine, health maintenance, risk reduction + Output format: Preventive care plan with timeline""", + agent_name="Prevention Specialist", + ), + TreeAgent( + system_prompt="""Patient Education Agent: + - Provide comprehensive patient education + - Explain conditions and treatments in accessible language + - Develop self-management strategies + - Create educational materials and resources + - Address common questions and concerns + - Provide lifestyle modification guidance + - Support treatment adherence + Medical knowledge base: Patient education, health literacy, behavior change + Output format: Educational plan with resources and materials""", + agent_name="Patient Educator", + ), +] + +# Create trees +diagnostic_tree = Tree( + tree_name="Diagnostic Specialists", agents=diagnostic_agents +) +treatment_tree = Tree( + tree_name="Treatment Specialists", agents=treatment_agents +) +followup_tree = Tree( + tree_name="Follow-up and Monitoring", agents=followup_agents +) + +# Create the ForestSwarm +medical_forest = ForestSwarm( + trees=[diagnostic_tree, treatment_tree, followup_tree] +) + +# Example usage +task = "Patient presents with persistent headache for 2 weeks, accompanied by visual disturbances and neck stiffness. Need comprehensive evaluation and treatment plan." +result = medical_forest.run(task) diff --git a/tree_swarm_test.py b/new_features_examples/forest_swarm_examples/tree_swarm_test.py similarity index 100% rename from tree_swarm_test.py rename to new_features_examples/forest_swarm_examples/tree_swarm_test.py diff --git a/pyproject.toml b/pyproject.toml index 6df29882..6e3bfd87 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ asyncio = ">=3.4.3,<4.0" toml = "*" pypdf = "4.3.1" loguru = "*" -pydantic = ">=2.8.2<3.0" +pydantic = "2.8.2" tenacity = "*" psutil = "*" sentry-sdk = {version = "*", extras = ["http"]} # Updated here diff --git a/sol_agent.py b/sol_agent.py deleted file mode 100644 index 09319d0e..00000000 --- a/sol_agent.py +++ /dev/null @@ -1,433 +0,0 @@ -import asyncio -import json -from dataclasses import asdict, dataclass -from datetime import datetime -from typing import Dict, List, Optional, Set - -import aiohttp -import matplotlib.pyplot as plt -import networkx as nx -import websockets -from loguru import logger - -from swarms import Agent - -TREND_AGENT_PROMPT = """You are a specialized blockchain trend analysis agent. Your role: -1. Analyze transaction patterns in Solana blockchain data -2. Identify volume trends, price movements, and temporal patterns -3. Focus on whale movements and their market impact -4. Format findings in clear, structured JSON -5. Include confidence scores for each insight -6. Flag unusual patterns or anomalies -7. Provide historical context for significant movements - -Output format: -{ - "trends": [ - {"pattern": str, "confidence": float, "impact": str} - ], - "whale_activity": {...}, - "temporal_analysis": {...} -}""" - -RISK_AGENT_PROMPT = """You are a blockchain risk assessment specialist. Your tasks: -1. Identify suspicious transaction patterns -2. Monitor for known exploit signatures -3. Assess wallet clustering and relationship patterns -4. Evaluate transaction velocity and size anomalies -5. Check for bridge-related risks -6. Monitor smart contract interactions -7. Flag potential wash trading - -Output format: -{ - "risk_score": float, - "flags": [...], - "recommendations": [...] -}""" - -SUMMARY_AGENT_PROMPT = """You are a blockchain data synthesis expert. Your responsibilities: -1. Combine insights from trend and risk analyses -2. Prioritize actionable intelligence -3. Highlight critical patterns -4. Generate executive summaries -5. Provide market context -6. Make predictions with confidence intervals -7. Suggest trading strategies based on data - -Output format: -{ - "key_insights": [...], - "market_impact": str, - "recommendations": {...} -}""" - - -@dataclass -class Transaction: - signature: str - timestamp: datetime - amount: float - from_address: str - to_address: str - - -class SolanaRPC: - def __init__( - self, endpoint="https://api.mainnet-beta.solana.com" - ): - self.endpoint = endpoint - self.session = None - - async def get_signatures(self, address: str) -> List[Dict]: - if not self.session: - self.session = aiohttp.ClientSession() - - payload = { - "jsonrpc": "2.0", - "id": 1, - "method": "getSignaturesForAddress", - "params": [address, {"limit": 100}], - } - - async with self.session.post( - self.endpoint, json=payload - ) as response: - result = await response.json() - return result.get("result", []) - - async def get_transaction(self, signature: str) -> Dict: - payload = { - "jsonrpc": "2.0", - "id": 1, - "method": "getTransaction", - "params": [ - signature, - { - "encoding": "json", - "maxSupportedTransactionVersion": 0, - }, - ], - } - - async with self.session.post( - self.endpoint, json=payload - ) as response: - result = await response.json() - return result.get("result", {}) - - -class AlertSystem: - def __init__(self, email: str, threshold: float = 1000.0): - self.email = email - self.threshold = threshold - self.smtp_server = "smtp.gmail.com" - self.smtp_port = 587 - - async def check_and_alert( - self, transaction: Transaction, risk_score: float - ): - if transaction.amount > self.threshold or risk_score > 0.8: - await self.send_alert(transaction, risk_score) - - async def send_alert( - self, transaction: Transaction, risk_score: float - ): - # msg = MIMEText( - # f"High-risk transaction detected:\n" - # f"Amount: {transaction.amount} SOL\n" - # f"Risk Score: {risk_score}\n" - # f"Signature: {transaction.signature}" - # ) - logger.info( - f"Alert sent for transaction {transaction.signature}" - ) - - -class WalletClusterAnalyzer: - def __init__(self): - self.graph = nx.Graph() - self.known_wallets: Set[str] = set() - - def update_graph(self, transaction: Transaction): - self.graph.add_edge( - transaction.from_address, - transaction.to_address, - weight=transaction.amount, - ) - self.known_wallets.add(transaction.from_address) - self.known_wallets.add(transaction.to_address) - - def identify_clusters(self) -> Dict: - communities = nx.community.greedy_modularity_communities( - self.graph - ) - return { - "clusters": [list(c) for c in communities], - "central_wallets": [ - wallet - for wallet in self.known_wallets - if self.graph.degree[wallet] > 5 - ], - } - - -class TransactionVisualizer: - def __init__(self): - self.transaction_history = [] - - def add_transaction(self, transaction: Transaction): - self.transaction_history.append(asdict(transaction)) - - def generate_volume_chart(self) -> str: - volumes = [tx["amount"] for tx in self.transaction_history] - plt.figure(figsize=(12, 6)) - plt.plot(volumes) - plt.title("Transaction Volume Over Time") - plt.savefig("volume_chart.png") - return "volume_chart.png" - - def generate_network_graph( - self, wallet_analyzer: WalletClusterAnalyzer - ) -> str: - plt.figure(figsize=(15, 15)) - pos = nx.spring_layout(wallet_analyzer.graph) - nx.draw( - wallet_analyzer.graph, - pos, - node_size=1000, - node_color="lightblue", - with_labels=True, - ) - plt.savefig("network_graph.png") - return "network_graph.png" - - -class SolanaMultiAgentAnalyzer: - def __init__( - self, - min_amount: float = 50.0, - websocket_url: str = "wss://api.mainnet-beta.solana.com", - alert_email: str = None, - ): - self.rpc = SolanaRPC() - self.websocket_url = websocket_url - self.min_amount = min_amount - self.transactions = [] - - self.wallet_analyzer = WalletClusterAnalyzer() - self.visualizer = TransactionVisualizer() - self.alert_system = ( - AlertSystem(alert_email) if alert_email else None - ) - - self.trend_agent = Agent( - agent_name="trend-analyzer", - system_prompt=TREND_AGENT_PROMPT, - model_name="gpt-4o-mini", - max_loops=1, - streaming_on=True, - ) - - self.risk_agent = Agent( - agent_name="risk-analyzer", - system_prompt=RISK_AGENT_PROMPT, - model_name="gpt-4o-mini", - max_loops=1, - streaming_on=True, - ) - - self.summary_agent = Agent( - agent_name="summary-agent", - system_prompt=SUMMARY_AGENT_PROMPT, - model_name="gpt-4o-mini", - max_loops=1, - streaming_on=True, - ) - - logger.add( - "solana_analysis.log", rotation="500 MB", level="INFO" - ) - - async def start_websocket_stream(self): - async with websockets.connect( - self.websocket_url - ) as websocket: - subscribe_message = { - "jsonrpc": "2.0", - "id": 1, - "method": "programSubscribe", - "params": [ - "11111111111111111111111111111111", - {"encoding": "json", "commitment": "confirmed"}, - ], - } - await websocket.send(json.dumps(subscribe_message)) - - while True: - try: - msg = await websocket.recv() - transaction = await self.parse_websocket_message( - msg - ) - if ( - transaction - and transaction.amount >= self.min_amount - ): - await self.process_transaction(transaction) - except Exception as e: - logger.error(f"Websocket error: {e}") - await asyncio.sleep(5) - - async def parse_websocket_message( - self, msg: str - ) -> Optional[Transaction]: - try: - data = json.loads(msg) - if "params" in data and "result" in data["params"]: - tx_data = data["params"]["result"] - return Transaction( - signature=tx_data["signature"], - timestamp=datetime.fromtimestamp( - tx_data["blockTime"] - ), - amount=float( - tx_data["meta"]["postBalances"][0] - - tx_data["meta"]["preBalances"][0] - ) - / 1e9, - from_address=tx_data["transaction"]["message"][ - "accountKeys" - ][0], - to_address=tx_data["transaction"]["message"][ - "accountKeys" - ][1], - ) - except Exception as e: - logger.error(f"Error parsing websocket message: {e}") - return None - - async def process_transaction(self, transaction: Transaction): - self.wallet_analyzer.update_graph(transaction) - self.visualizer.add_transaction(transaction) - - risk_analysis = await self.risk_agent.run( - f"Analyze risk for transaction: {json.dumps(asdict(transaction))}" - ) - - if self.alert_system: - await self.alert_system.check_and_alert( - transaction, risk_analysis.get("risk_score", 0) - ) - - async def fetch_transactions(self) -> List[Transaction]: - try: - signatures = await self.rpc.get_signatures( - "11111111111111111111111111111111" - ) - transactions = [] - - for sig_info in signatures: - tx_data = await self.rpc.get_transaction( - sig_info["signature"] - ) - if not tx_data or "meta" not in tx_data: - continue - - pre_balances = tx_data["meta"]["preBalances"] - post_balances = tx_data["meta"]["postBalances"] - amount = abs(pre_balances[0] - post_balances[0]) / 1e9 - - if amount >= self.min_amount: - tx = Transaction( - signature=sig_info["signature"], - timestamp=datetime.fromtimestamp( - tx_data["blockTime"] - ), - amount=amount, - from_address=tx_data["transaction"][ - "message" - ]["accountKeys"][0], - to_address=tx_data["transaction"]["message"][ - "accountKeys" - ][1], - ) - transactions.append(tx) - - return transactions - except Exception as e: - logger.error(f"Error fetching transactions: {e}") - return [] - - async def analyze_transactions( - self, transactions: List[Transaction] - ) -> Dict: - tx_data = [asdict(tx) for tx in transactions] - cluster_data = self.wallet_analyzer.identify_clusters() - - trend_analysis = await self.trend_agent.run( - f"Analyze trends in: {json.dumps(tx_data)}" - ) - print(trend_analysis) - - risk_analysis = await self.risk_agent.run( - f"Analyze risks in: {json.dumps({'transactions': tx_data, 'clusters': cluster_data})}" - ) - print(risk_analysis) - - summary = await self.summary_agent.run( - f"Synthesize insights from: {trend_analysis}, {risk_analysis}" - ) - - print(summary) - - volume_chart = self.visualizer.generate_volume_chart() - network_graph = self.visualizer.generate_network_graph( - self.wallet_analyzer - ) - - return { - "transactions": tx_data, - "trend_analysis": trend_analysis, - "risk_analysis": risk_analysis, - "cluster_analysis": cluster_data, - "summary": summary, - "visualizations": { - "volume_chart": volume_chart, - "network_graph": network_graph, - }, - } - - async def run_continuous_analysis(self): - logger.info("Starting continuous analysis") - asyncio.create_task(self.start_websocket_stream()) - - while True: - try: - transactions = await self.fetch_transactions() - if transactions: - analysis = await self.analyze_transactions( - transactions - ) - timestamp = datetime.now().strftime( - "%Y%m%d_%H%M%S" - ) - with open(f"analysis_{timestamp}.json", "w") as f: - json.dump(analysis, f, indent=2, default=str) - logger.info( - f"Analysis completed: analysis_{timestamp}.json" - ) - await asyncio.sleep(60) - except Exception as e: - logger.error(f"Error in analysis loop: {e}") - await asyncio.sleep(60) - - -# Add to __main__: -if __name__ == "__main__": - logger.info("Starting Solana analyzer...") - analyzer = SolanaMultiAgentAnalyzer(alert_email="your@email.com") - try: - asyncio.run(analyzer.run_continuous_analysis()) - except Exception as e: - logger.error(f"Critical error: {e}") diff --git a/swarm_router_example.py b/swarm_router_example.py new file mode 100644 index 00000000..d7397457 --- /dev/null +++ b/swarm_router_example.py @@ -0,0 +1,165 @@ + +from swarms import Agent, SwarmRouter + +# Portfolio Analysis Specialist +portfolio_analyzer = Agent( + agent_name="Portfolio-Analysis-Specialist", + system_prompt="""You are an expert portfolio analyst specializing in fund analysis and selection. Your core competencies include: + - Comprehensive analysis of mutual funds, ETFs, and index funds + - Evaluation of fund performance metrics (expense ratios, tracking error, Sharpe ratio) + - Assessment of fund composition and strategy alignment + - Risk-adjusted return analysis + - Tax efficiency considerations + + For each portfolio analysis: + 1. Evaluate fund characteristics and performance metrics + 2. Analyze expense ratios and fee structures + 3. Assess historical performance and volatility + 4. Compare funds within same category + 5. Consider tax implications + 6. Review fund manager track record and strategy consistency + + Maintain focus on cost-efficiency and alignment with investment objectives.""", + model_name="gpt-4o", + max_loops=1, + saved_state_path="portfolio_analyzer.json", + user_name="investment_team", + retry_attempts=2, + context_length=200000, + output_type="string", +) + +# Asset Allocation Strategist +allocation_strategist = Agent( + agent_name="Asset-Allocation-Strategist", + system_prompt="""You are a specialized asset allocation strategist focused on portfolio construction and optimization. Your expertise includes: + - Strategic and tactical asset allocation + - Risk tolerance assessment and portfolio matching + - Geographic and sector diversification + - Rebalancing strategy development + - Portfolio optimization using modern portfolio theory + + For each allocation: + 1. Analyze investor risk tolerance and objectives + 2. Develop appropriate asset class weights + 3. Select optimal fund combinations + 4. Design rebalancing triggers and schedules + 5. Consider tax-efficient fund placement + 6. Account for correlation between assets + + Focus on creating well-diversified portfolios aligned with client goals and risk tolerance.""", + model_name="gpt-4o", + max_loops=1, + saved_state_path="allocation_strategist.json", + user_name="investment_team", + retry_attempts=2, + context_length=200000, + output_type="string", +) + +# Risk Management Specialist +risk_manager = Agent( + agent_name="Risk-Management-Specialist", + system_prompt="""You are a risk management specialist focused on portfolio risk assessment and mitigation. Your expertise covers: + - Portfolio risk metrics analysis + - Downside protection strategies + - Correlation analysis between funds + - Stress testing and scenario analysis + - Market condition impact assessment + + For each portfolio: + 1. Calculate key risk metrics (Beta, Standard Deviation, etc.) + 2. Analyze correlation matrices + 3. Perform stress tests under various scenarios + 4. Evaluate liquidity risks + 5. Assess concentration risks + 6. Monitor factor exposures + + Focus on maintaining appropriate risk levels while maximizing risk-adjusted returns.""", + model_name="gpt-4o", + max_loops=1, + saved_state_path="risk_manager.json", + user_name="investment_team", + retry_attempts=2, + context_length=200000, + output_type="string", +) + +# Portfolio Implementation Specialist +implementation_specialist = Agent( + agent_name="Portfolio-Implementation-Specialist", + system_prompt="""You are a portfolio implementation specialist focused on efficient execution and maintenance. Your responsibilities include: + - Fund selection for specific asset class exposure + - Tax-efficient implementation strategies + - Portfolio rebalancing execution + - Trading cost analysis + - Cash flow management + + For each implementation: + 1. Select most efficient funds for desired exposure + 2. Plan tax-efficient transitions + 3. Design rebalancing schedule + 4. Optimize trade execution + 5. Manage cash positions + 6. Monitor tracking error + + Maintain focus on minimizing costs and maximizing tax efficiency during implementation.""", + model_name="gpt-4o", + max_loops=1, + saved_state_path="implementation_specialist.json", + user_name="investment_team", + retry_attempts=2, + context_length=200000, + output_type="string", +) + +# Portfolio Monitoring Specialist +monitoring_specialist = Agent( + agent_name="Portfolio-Monitoring-Specialist", + system_prompt="""You are a portfolio monitoring specialist focused on ongoing portfolio oversight and optimization. Your expertise includes: + - Regular portfolio performance review + - Drift monitoring and rebalancing triggers + - Fund changes and replacements + - Tax loss harvesting opportunities + - Performance attribution analysis + + For each review: + 1. Track portfolio drift from targets + 2. Monitor fund performance and changes + 3. Identify tax loss harvesting opportunities + 4. Analyze tracking error and expenses + 5. Review risk metrics evolution + 6. Generate performance attribution reports + + Ensure continuous alignment with investment objectives while maintaining optimal portfolio efficiency.""", + model_name="gpt-4o", + max_loops=1, + saved_state_path="monitoring_specialist.json", + user_name="investment_team", + retry_attempts=2, + context_length=200000, + output_type="string", +) + +# List of all agents for portfolio management +portfolio_agents = [ + portfolio_analyzer, + allocation_strategist, + risk_manager, + implementation_specialist, + monitoring_specialist +] + + +# Router +router = SwarmRouter( + name = "etf-portfolio-management-swarm", + description="Creates and suggests an optimal portfolio", + agents = portfolio_agents, + swarm_type="SequentialWorkflow", # ConcurrentWorkflow + max_loops = 1, +) + +router.run( + task = "I have 10,000$ and I want to create a porfolio based on energy, ai, and datacenter companies. high growth." +) \ No newline at end of file diff --git a/swarms/agents/__init__.py b/swarms/agents/__init__.py index d2156d64..68f75f99 100644 --- a/swarms/agents/__init__.py +++ b/swarms/agents/__init__.py @@ -1,4 +1,4 @@ -from swarms.agents.stopping_conditions import ( +from swarms.structs.stopping_conditions import ( check_cancelled, check_complete, check_done, diff --git a/swarms/cli/create_agent.py b/swarms/cli/create_agent.py index 0f536da6..8e0c5100 100644 --- a/swarms/cli/create_agent.py +++ b/swarms/cli/create_agent.py @@ -1,79 +1,43 @@ from swarms.structs.agent import Agent -from swarms.structs.agent_registry import AgentRegistry -# Registry of agents -agent_registry = AgentRegistry( - name="Swarms CLI", - description="A registry of agents for the Swarms CLI", -) - - -def create_agent( +# Run the agents in the registry +def run_agent_by_name( name: str, system_prompt: str, - max_loops: int = 1, - model_name: str = "gpt-4o", + model_name: str, + max_loops: int, + task: str, + img: str, + *args, + **kwargs, ): """ - Create and initialize an agent with the given parameters. + This function creates an Agent instance and runs a task on it. Args: name (str): The name of the agent. system_prompt (str): The system prompt for the agent. - max_loops (int, optional): The maximum number of loops the agent can perform. Defaults to 1. + model_name (str): The name of the model used by the agent. + max_loops (int): The maximum number of loops the agent can run. + task (str): The task to be run by the agent. + *args: Variable length arguments. + **kwargs: Keyword arguments. Returns: - Agent: The initialized agent. - - """ - # Initialize the agent - agent = Agent( - agent_name=name, - system_prompt=system_prompt, - model_name=model_name, - max_loops=max_loops, - autosave=True, - dashboard=False, - verbose=True, - dynamic_temperature_enabled=True, - saved_state_path=f"{name}.json", - user_name="swarms_corp", - retry_attempts=1, - context_length=200000, - # return_step_meta=True, - # disable_print_every_step=True, - # output_type="json", - interactive=True, - ) - - agent_registry.add(agent) - - return agent - - -# Run the agents in the registry -def run_agent_by_name(name: str, task: str, *args, **kwargs): + The output of the task run by the agent. """ - Run an agent by its name and perform a specified task. - - Parameters: - - name (str): The name of the agent. - - task (str): The task to be performed by the agent. - - *args: Variable length argument list. - - **kwargs: Arbitrary keyword arguments. - - Returns: - - output: The output of the agent's task. - - """ - agent = agent_registry.get_agent_by_name(name) - - output = agent.run(task, *args, **kwargs) - - return output - - -# # Test -# out = create_agent("Accountant1", "Prepares financial statements") -# print(out) + try: + agent = Agent( + agent_name=name, + system_prompt=system_prompt, + model_name=model_name, + max_loops=max_loops, + ) + + output = agent.run(task=task, img=img, *args, **kwargs) + + return output + except Exception as e: + print(f"An error occurred: {str(e)}") + return None diff --git a/swarms/agents/stopping_conditions.py b/swarms/structs/stopping_conditions.py similarity index 100% rename from swarms/agents/stopping_conditions.py rename to swarms/structs/stopping_conditions.py diff --git a/swarms/telemetry/capture_sys_data.py b/swarms/telemetry/capture_sys_data.py index 09d94a70..4a09099b 100644 --- a/swarms/telemetry/capture_sys_data.py +++ b/swarms/telemetry/capture_sys_data.py @@ -50,7 +50,7 @@ def capture_system_data() -> Dict[str, str]: def log_agent_data( - data_dict: dict, retry_attempts: int = 1 + data_dict: dict ) -> dict | None: """ Logs agent data to the Swarms database with retry logic. diff --git a/swarms/utils/loguru_logger.py b/swarms/utils/loguru_logger.py index af5c7239..0f0524b5 100644 --- a/swarms/utils/loguru_logger.py +++ b/swarms/utils/loguru_logger.py @@ -1,17 +1,58 @@ import os import uuid +from typing import Any, Dict from loguru import logger +import requests +from swarms.telemetry.sys_info import system_info +def log_agent_data(data: Any) -> Dict: + """ + Send data to the agent logging API endpoint. + + Args: + data: Any data structure that can be JSON serialized + + Returns: + Dict: The JSON response from the API + """ + try: + # Prepare the data payload + data_dict = {"data": data} + + # API endpoint configuration + url = "https://swarms.world/api/get-agents/log-agents" + headers = { + "Content-Type": "application/json", + "Authorization": "Bearer sk-f24a13ed139f757d99cdd9cdcae710fccead92681606a97086d9711f69d44869", + } + + # Send the request + response = requests.post(url, json=data_dict, headers=headers) + response.raise_for_status() # Raise an error for HTTP codes 4xx/5xx + + # Return the JSON response + return response.json() + except Exception as e: + logger.error(f"Failed to log agent data: {e}") + return {"error": str(e)} def initialize_logger(log_folder: str = "logs"): + """ + Initialize and configure the Loguru logger. + + Args: + log_folder: The folder where logs will be stored. + Returns: + The configured Loguru logger. + """ AGENT_WORKSPACE = "agent_workspace" # Check if WORKSPACE_DIR is set, if not, set it to AGENT_WORKSPACE if "WORKSPACE_DIR" not in os.environ: os.environ["WORKSPACE_DIR"] = AGENT_WORKSPACE - # Create a folder within the agent_workspace + # Create the log folder within the workspace log_folder_path = os.path.join( os.getenv("WORKSPACE_DIR"), log_folder ) @@ -24,6 +65,7 @@ def initialize_logger(log_folder: str = "logs"): log_folder_path, f"{log_folder}_{uuid_for_log}.log" ) + # Add a Loguru sink for file logging logger.add( log_file_path, level="INFO", @@ -31,7 +73,47 @@ def initialize_logger(log_folder: str = "logs"): backtrace=True, diagnose=True, enqueue=True, - retention="10 days", + # retention="10 days", # compression="zip", ) + + # Add a Loguru sink to intercept all log messages and send them to `log_agent_data` + class AgentLogHandler: + def write(self, message): + if message.strip(): # Avoid sending empty messages + payload = { + "log": str(message.strip()), + "folder": log_folder, + "metadata": system_info(), + } + response = log_agent_data(payload) + logger.debug(f"Sent to API: {payload}, Response: {response}") + + + logger.add(AgentLogHandler(), level="INFO") + return logger + + +# if __name__ == "__main__": +# # Initialize the logger +# logger = initialize_logger() + +# # Generate test log messages +# logger.info("This is a test info log.") +# logger.warning("This is a test warning log.") +# logger.error("This is a test error log.") + +# # Simulate agent data logging +# test_data = { +# "agent_name": "TestAgent", +# "task": "Example Task", +# "status": "Running", +# "details": { +# "runtime": "5s", +# "success": True +# } +# } +# log_agent_data(test_data) + +# print("Test logging completed.") diff --git a/zpk.py b/zpk.py deleted file mode 100644 index af37e01f..00000000 --- a/zpk.py +++ /dev/null @@ -1,206 +0,0 @@ -from swarms import Agent -from loguru import logger -import random -import re - -# Configure loguru -logger.add("zkp_log.log", rotation="500 KB", retention="10 days", level="INFO") - - -class ProverAgent: - """ - Prover Agent for Zero Knowledge Proof. - - Responsibilities: - - Generate commitments based on a secret. - - Respond to challenges from the Verifier. - - Attributes: - agent (Agent): Swarms agent instance. - p (int): The prime modulus. - g (int): The generator. - x (int): The Prover's secret. - """ - - def __init__(self, p: int, g: int, secret: int): - self.p = p - self.g = g - self.x = secret # Prover's secret - self.agent = Agent( - agent_name="ProverAgent", - model_name="gpt-4o-mini", - max_loop=1, - interactive=False, - streaming_on=True, - system_prompt=( - "You are the Prover in a Zero Knowledge Proof (ZKP) system. " - "Your responsibilities are to generate commitments based on a secret value and " - "respond to challenges from the Verifier without revealing the secret. " - "Follow mathematical rules of modular arithmetic when performing computations." - ), - ) - logger.info("Initialized ProverAgent with p={}, g={}, secret={}", p, g, secret) - - def generate_commitment(self) -> tuple[int, int]: - """ - Generates a random commitment for the proof. - - Returns: - tuple[int, int]: The random value (r) and the commitment (t). - """ - r = random.randint(1, self.p - 2) - task = ( - f"Compute the commitment t = g^r % p for g={self.g}, r={r}, p={self.p}. " - "Return only the numerical value of t as an integer." - ) - t = self.agent.run(task=task) - t_value = self._extract_integer(t, "commitment") - logger.info("Prover generated commitment: r={}, t={}", r, t_value) - return r, t_value - - def _extract_integer(self, response: str, label: str) -> int: - """ - Extracts an integer from the LLM response. - - Args: - response (str): The response from the agent. - label (str): A label for logging purposes. - - Returns: - int: The extracted integer value. - """ - try: - # Use regex to find the first integer in the response - match = re.search(r"\b\d+\b", response) - if match: - value = int(match.group(0)) - return value - else: - raise ValueError(f"No integer found in {label} response: {response}") - except Exception as e: - logger.error("Failed to extract integer from {label} response: {response}") - raise ValueError(f"Invalid {label} response: {response}") from e - - def respond_to_challenge(self, r: int, c: int) -> int: - """ - Computes the response to a challenge. - - Args: - r (int): The random value used in the commitment. - c (int): The challenge issued by the Verifier. - - Returns: - int: The response (z). - """ - task = f"Compute the response z = (r + c * x) % (p-1) for r={r}, c={c}, x={self.x}, p={self.p}." - z = self.agent.run(task=task) - logger.info("Prover responded to challenge: z={}", z) - return int(z) - - -class VerifierAgent: - """ - Verifier Agent for Zero Knowledge Proof. - - Responsibilities: - - Issue challenges to the Prover. - - Verify the Prover's response. - - Attributes: - agent (Agent): Swarms agent instance. - p (int): The prime modulus. - g (int): The generator. - y (int): The public value from the Prover. - """ - - def __init__(self, p: int, g: int, y: int): - self.p = p - self.g = g - self.y = y # Public value - self.agent = Agent( - agent_name="VerifierAgent", - model_name="gpt-4o-mini", - max_loop=1, - interactive=False, - streaming_on=True, - system_prompt=( - "You are the Verifier in a Zero Knowledge Proof (ZKP) system. " - "Your responsibilities are to issue random challenges and verify the Prover's response. " - "Use modular arithmetic to check if the proof satisfies g^z % p == (t * y^c) % p." - ), - ) - logger.info("Initialized VerifierAgent with p={}, g={}, y={}", p, g, y) - - def issue_challenge(self) -> int: - """ - Issues a random challenge to the Prover. - - Returns: - int: The challenge value (c). - """ - c = random.randint(1, 10) - logger.info("Verifier issued challenge: c={}", c) - return c - - def verify_proof(self, t: int, z: int, c: int) -> bool: - """ - Verifies the Prover's response. - - Args: - t (int): The commitment from the Prover. - z (int): The response from the Prover. - c (int): The challenge issued to the Prover. - - Returns: - bool: True if the proof is valid, False otherwise. - """ - task = f"Verify if g^z % p == (t * y^c) % p for g={self.g}, z={z}, p={self.p}, t={t}, y={self.y}, c={c}." - verification_result = self.agent.run(task=task) - is_valid = verification_result.strip().lower() == "true" - logger.info("Verifier checked proof: t={}, z={}, c={}, valid={}", t, z, c, is_valid) - return is_valid - - -class CoordinatorAgent: - """ - Coordinator for orchestrating the Zero Knowledge Proof protocol. - - Responsibilities: - - Initialize parameters. - - Facilitate interaction between Prover and Verifier agents. - """ - - def __init__(self, p: int, g: int, secret: int): - self.p = p - self.g = g - self.prover = ProverAgent(p, g, secret) - y = pow(g, secret, p) # Public value - self.verifier = VerifierAgent(p, g, y) - logger.info("Coordinator initialized with p={}, g={}, secret={}", p, g, secret) - - def orchestrate(self) -> bool: - """ - Orchestrates the Zero Knowledge Proof protocol. - - Returns: - bool: True if the proof is valid, False otherwise. - """ - logger.info("Starting ZKP protocol orchestration.") - r, t = self.prover.generate_commitment() - c = self.verifier.issue_challenge() - z = self.prover.respond_to_challenge(r, c) - is_valid = self.verifier.verify_proof(t, z, c) - logger.info("ZKP protocol completed. Valid proof: {}", is_valid) - return is_valid - - -if __name__ == "__main__": - # Example parameters - p = 23 # Prime number - g = 5 # Generator - secret = 7 # Prover's secret - - # Initialize the Coordinator and run the protocol - coordinator = CoordinatorAgent(p, g, secret) - result = coordinator.orchestrate() - print(f"Zero Knowledge Proof Verification Result: {'Valid' if result else 'Invalid'}")