From bb7e18f654855b362b0b07f1b5cd9b5d4b3c9cf4 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Mon, 16 Dec 2024 18:14:38 -0800 Subject: [PATCH] [6.6.8] --- api/skypilot.yaml | 4 + example.py | 11 +- fastrag.py | 387 ------------ forex_agents.py | 554 +++++++++++++++++ pyproject.toml | 6 +- real_time.py | 618 ------------------- swarm_builder.py | 333 +++++++++++ swarm_router_example.py | 165 ----- swarms/structs/agent.py | 863 +++++++++++---------------- swarms/structs/rearrange.py | 1 - swarms/telemetry/bootup.py | 58 +- swarms/telemetry/capture_sys_data.py | 29 +- swarms/utils/loguru_logger.py | 7 +- 13 files changed, 1273 insertions(+), 1763 deletions(-) delete mode 100644 fastrag.py create mode 100644 forex_agents.py delete mode 100644 real_time.py create mode 100644 swarm_builder.py delete mode 100644 swarm_router_example.py diff --git a/api/skypilot.yaml b/api/skypilot.yaml index 5e1026c4..3524aa95 100644 --- a/api/skypilot.yaml +++ b/api/skypilot.yaml @@ -35,3 +35,7 @@ run: | # LOG_LEVEL: "INFO" # # MAX_WORKERS: "4" +# metadata: +# name: swarms-api-service +# version: "1.0.0" +# environment: production \ No newline at end of file diff --git a/example.py b/example.py index 6ba8a46c..76c23353 100644 --- a/example.py +++ b/example.py @@ -6,9 +6,10 @@ from swarms.prompts.finance_agent_sys_prompt import ( # Initialize the agent agent = Agent( agent_name="Financial-Analysis-Agent", - agent_description = "Personal finance advisor agent", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT + "Output the token when you're done creating a portfolio of etfs, index, funds, and more for AI", - model_name="gpt-4o", # Use any model from litellm + agent_description="Personal finance advisor agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT + + "Output the token when you're done creating a portfolio of etfs, index, funds, and more for AI", + model_name="gpt-4o", # Use any model from litellm max_loops="auto", dynamic_temperature_enabled=True, user_name="Kye", @@ -18,8 +19,8 @@ agent = Agent( return_step_meta=False, output_type="str", # "json", "dict", "csv" OR "string" "yaml" and auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task - max_tokens=16000, # max output tokens - interactive = True, + max_tokens=16000, # max output tokens + interactive=True, stopping_token="", execute_tool=True, ) diff --git a/fastrag.py b/fastrag.py deleted file mode 100644 index 20839bf1..00000000 --- a/fastrag.py +++ /dev/null @@ -1,387 +0,0 @@ -from typing import List, Dict, Optional, Union, Any -from dataclasses import dataclass -from pathlib import Path -import numpy as np -from scipy.sparse import csr_matrix -from sklearn.cluster import AgglomerativeClustering -from sentence_transformers import SentenceTransformer -import faiss -import pickle -import time -from loguru import logger -from concurrent.futures import ThreadPoolExecutor -import threading -import uuid - -@dataclass -class Document: - """Represents a document in the HQD-RAG system. - - Attributes: - id (str): Unique identifier for the document - content (str): Raw text content of the document - embedding (Optional[np.ndarray]): Quantum-inspired embedding vector - cluster_id (Optional[int]): ID of the cluster this document belongs to - """ - id: str - content: str - embedding: Optional[np.ndarray] = None - cluster_id: Optional[int] = None - -class HQDRAG: - """ - Hierarchical Quantum-Inspired Distributed RAG (HQD-RAG) System - - A production-grade implementation of the HQD-RAG algorithm for ultra-fast - and reliable document retrieval. Uses quantum-inspired embeddings and - hierarchical clustering for efficient search. - - Attributes: - embedding_dim (int): Dimension of the quantum-inspired embeddings - num_clusters (int): Number of hierarchical clusters - similarity_threshold (float): Threshold for quantum similarity matching - reliability_threshold (float): Threshold for reliability verification - """ - - def __init__( - self, - embedding_dim: int = 768, - num_clusters: int = 128, - similarity_threshold: float = 0.75, - reliability_threshold: float = 0.85, - model_name: str = "all-MiniLM-L6-v2" - ): - """Initialize the HQD-RAG system. - - Args: - embedding_dim: Dimension of document embeddings - num_clusters: Number of clusters for hierarchical organization - similarity_threshold: Minimum similarity score for retrieval - reliability_threshold: Minimum reliability score for verification - model_name: Name of the sentence transformer model to use - """ - logger.info(f"Initializing HQD-RAG with {embedding_dim} dimensions") - - self.embedding_dim = embedding_dim - self.num_clusters = num_clusters - self.similarity_threshold = similarity_threshold - self.reliability_threshold = reliability_threshold - - # Initialize components - self.documents: Dict[str, Document] = {} - self.encoder = SentenceTransformer(model_name) - self.index = faiss.IndexFlatIP(embedding_dim) # Inner product index - self.clustering = AgglomerativeClustering( - n_clusters=num_clusters, - metric='euclidean', - linkage='ward' - ) - - # Thread safety - self._lock = threading.Lock() - self._executor = ThreadPoolExecutor(max_workers=4) - - logger.info("HQD-RAG system initialized successfully") - - def _compute_quantum_embedding(self, text: str) -> np.ndarray: - """Compute quantum-inspired embedding for text. - - Args: - text: Input text to embed - - Returns: - Quantum-inspired embedding vector - """ - # Get base embedding - base_embedding = self.encoder.encode([text])[0] - - # Apply quantum-inspired transformation - # Simulate superposition by adding phase components - phase = np.exp(2j * np.pi * np.random.random(self.embedding_dim)) - quantum_embedding = base_embedding * phase - - # Normalize to unit length - return quantum_embedding / np.linalg.norm(quantum_embedding) - - def _verify_reliability(self, doc: Document, query_embedding: np.ndarray) -> float: - """Verify the reliability of a document match. - - Args: - doc: Document to verify - query_embedding: Query embedding vector - - Returns: - Reliability score between 0 and 1 - """ - if doc.embedding is None: - return 0.0 - - # Compute consistency score - consistency = np.abs(np.dot(doc.embedding, query_embedding)) - - # Add quantum noise resistance check - noise = np.random.normal(0, 0.1, self.embedding_dim) - noisy_query = query_embedding + noise - noisy_query = noisy_query / np.linalg.norm(noisy_query) - noise_resistance = np.abs(np.dot(doc.embedding, noisy_query)) - - return (consistency + noise_resistance) / 2 - - def add(self, content: str, doc_id: Optional[str] = None) -> str: - """Add a document to the system. - - Args: - content: Document text content - doc_id: Optional custom document ID - - Returns: - Document ID - """ - doc_id = doc_id or str(uuid.uuid4()) - - with self._lock: - try: - # Compute embedding - embedding = self._compute_quantum_embedding(content) - - # Create document - doc = Document( - id=doc_id, - content=content, - embedding=embedding - ) - - # Add to storage - self.documents[doc_id] = doc - self.index.add(embedding.reshape(1, -1)) - - # Update clustering - self._update_clusters() - - logger.info(f"Successfully added document {doc_id}") - return doc_id - - except Exception as e: - logger.error(f"Error adding document: {str(e)}") - raise - - def query( - self, - query: str, - k: int = 5, - return_scores: bool = False - ) -> Union[List[str], List[tuple[str, float]]]: - """Query the system for relevant documents. - - Args: - query: Query text - k: Number of results to return - return_scores: Whether to return similarity scores - - Returns: - List of document IDs or (document ID, score) tuples - """ - try: - # Compute query embedding - query_embedding = self._compute_quantum_embedding(query) - - # Search index - scores, indices = self.index.search( - query_embedding.reshape(1, -1), - k * 2 # Get extra results for reliability filtering - ) - - results = [] - for score, idx in zip(scores[0], indices[0]): - # Get document - doc_id = list(self.documents.keys())[idx] - doc = self.documents[doc_id] - - # Verify reliability - reliability = self._verify_reliability(doc, query_embedding) - - if reliability >= self.reliability_threshold: - results.append((doc_id, float(score))) - - if len(results) >= k: - break - - logger.info(f"Query returned {len(results)} results") - - if return_scores: - return results - return [doc_id for doc_id, _ in results] - - except Exception as e: - logger.error(f"Error processing query: {str(e)}") - raise - - def update(self, doc_id: str, new_content: str) -> None: - """Update an existing document. - - Args: - doc_id: ID of document to update - new_content: New document content - """ - with self._lock: - try: - if doc_id not in self.documents: - raise KeyError(f"Document {doc_id} not found") - - # Remove old embedding - old_doc = self.documents[doc_id] - if old_doc.embedding is not None: - self.index.remove_ids(np.array([list(self.documents.keys()).index(doc_id)])) - - # Compute new embedding - new_embedding = self._compute_quantum_embedding(new_content) - - # Update document - self.documents[doc_id] = Document( - id=doc_id, - content=new_content, - embedding=new_embedding - ) - - # Add new embedding - self.index.add(new_embedding.reshape(1, -1)) - - # Update clustering - self._update_clusters() - - logger.info(f"Successfully updated document {doc_id}") - - except Exception as e: - logger.error(f"Error updating document: {str(e)}") - raise - - def delete(self, doc_id: str) -> None: - """Delete a document from the system. - - Args: - doc_id: ID of document to delete - """ - with self._lock: - try: - if doc_id not in self.documents: - raise KeyError(f"Document {doc_id} not found") - - # Remove from index - idx = list(self.documents.keys()).index(doc_id) - self.index.remove_ids(np.array([idx])) - - # Remove from storage - del self.documents[doc_id] - - # Update clustering - self._update_clusters() - - logger.info(f"Successfully deleted document {doc_id}") - - except Exception as e: - logger.error(f"Error deleting document: {str(e)}") - raise - - def _update_clusters(self) -> None: - """Update hierarchical document clusters.""" - if len(self.documents) < 2: - return - - # Get all embeddings - embeddings = np.vstack([ - doc.embedding for doc in self.documents.values() - if doc.embedding is not None - ]) - - # Update clustering - clusters = self.clustering.fit_predict(embeddings) - - # Assign cluster IDs - for doc, cluster_id in zip(self.documents.values(), clusters): - doc.cluster_id = int(cluster_id) - - def save(self, path: Union[str, Path]) -> None: - """Save the system state to disk. - - Args: - path: Path to save directory - """ - path = Path(path) - path.mkdir(parents=True, exist_ok=True) - - try: - # Save documents - with open(path / "documents.pkl", "wb") as f: - pickle.dump(self.documents, f) - - # Save index - faiss.write_index(self.index, str(path / "index.faiss")) - - logger.info(f"Successfully saved system state to {path}") - - except Exception as e: - logger.error(f"Error saving system state: {str(e)}") - raise - - def load(self, path: Union[str, Path]) -> None: - """Load the system state from disk. - - Args: - path: Path to save directory - """ - path = Path(path) - - try: - # Load documents - with open(path / "documents.pkl", "rb") as f: - self.documents = pickle.load(f) - - # Load index - self.index = faiss.read_index(str(path / "index.faiss")) - - logger.info(f"Successfully loaded system state from {path}") - - except Exception as e: - logger.error(f"Error loading system state: {str(e)}") - raise - -# Example usage: -if __name__ == "__main__": - # Configure logging - logger.add( - "hqd_rag.log", - rotation="1 day", - retention="1 week", - level="INFO" - ) - - # Initialize system - rag = HQDRAG() - - # Add some documents - doc_ids = [] - docs = [ - "The quick brown fox jumps over the lazy dog", - "Machine learning is a subset of artificial intelligence", - "Python is a popular programming language" - ] - - for doc in docs: - doc_id = rag.add(doc) - doc_ids.append(doc_id) - - # Query - results = rag.query("What is machine learning?", return_scores=True) - print("Query results:", results) - - # # Update a document - # rag.update(doc_ids[0], "The fast brown fox jumps over the sleepy dog") - - # # Delete a document - # rag.delete(doc_ids[-1]) - - # # Save state - # rag.save("hqd_rag_state") - - - \ No newline at end of file diff --git a/forex_agents.py b/forex_agents.py new file mode 100644 index 00000000..1da5e896 --- /dev/null +++ b/forex_agents.py @@ -0,0 +1,554 @@ +from typing import Dict, List +from datetime import datetime +from loguru import logger +from swarms.structs.tree_swarm import TreeAgent, Tree, ForestSwarm +import asyncio +import json +import aiohttp +from bs4 import BeautifulSoup +import xml.etree.ElementTree as ET + +# Configure logging +logger.add("forex_forest.log", rotation="500 MB", level="INFO") + + +class ForexDataFeed: + """Real-time forex data collector using free open sources""" + + def __init__(self): + self.pairs = [ + "EUR/USD", + "GBP/USD", + "USD/JPY", + "AUD/USD", + "USD/CAD", + ] + + async def fetch_ecb_rates(self) -> Dict: + """Fetch exchange rates from European Central Bank (no key required)""" + try: + url = "https://www.ecb.europa.eu/stats/eurofxref/eurofxref-daily.xml" + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + xml_data = await response.text() + + root = ET.fromstring(xml_data) + rates = {} + for cube in root.findall(".//*[@currency]"): + currency = cube.get("currency") + rate = float(cube.get("rate")) + rates[currency] = rate + + # Calculate cross rates + rates["EUR"] = 1.0 # Base currency + cross_rates = {} + for pair in self.pairs: + base, quote = pair.split("/") + if base in rates and quote in rates: + cross_rates[pair] = rates[base] / rates[quote] + + return cross_rates + except Exception as e: + logger.error(f"Error fetching ECB rates: {e}") + return {} + + async def fetch_forex_factory_data(self) -> Dict: + """Scrape trading data from Forex Factory""" + try: + url = "https://www.forexfactory.com" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + } + + async with aiohttp.ClientSession() as session: + async with session.get( + url, headers=headers + ) as response: + text = await response.text() + + soup = BeautifulSoup(text, "html.parser") + + # Get calendar events + calendar = [] + calendar_table = soup.find( + "table", class_="calendar__table" + ) + if calendar_table: + for row in calendar_table.find_all( + "tr", class_="calendar__row" + ): + try: + event = { + "currency": row.find( + "td", class_="calendar__currency" + ).text.strip(), + "event": row.find( + "td", class_="calendar__event" + ).text.strip(), + "impact": row.find( + "td", class_="calendar__impact" + ).text.strip(), + "time": row.find( + "td", class_="calendar__time" + ).text.strip(), + } + calendar.append(event) + except: + continue + + return {"calendar": calendar} + except Exception as e: + logger.error(f"Error fetching Forex Factory data: {e}") + return {} + + async def fetch_tradingeconomics_data(self) -> Dict: + """Scrape economic data from Trading Economics""" + try: + url = "https://tradingeconomics.com/calendar" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + } + + async with aiohttp.ClientSession() as session: + async with session.get( + url, headers=headers + ) as response: + text = await response.text() + + soup = BeautifulSoup(text, "html.parser") + + # Get economic indicators + indicators = [] + calendar_table = soup.find("table", class_="table") + if calendar_table: + for row in calendar_table.find_all("tr")[ + 1: + ]: # Skip header + try: + cols = row.find_all("td") + indicator = { + "country": cols[0].text.strip(), + "indicator": cols[1].text.strip(), + "actual": cols[2].text.strip(), + "previous": cols[3].text.strip(), + "consensus": cols[4].text.strip(), + } + indicators.append(indicator) + except: + continue + + return {"indicators": indicators} + except Exception as e: + logger.error( + f"Error fetching Trading Economics data: {e}" + ) + return {} + + async def fetch_dailyfx_data(self) -> Dict: + """Scrape market analysis from DailyFX""" + try: + url = "https://www.dailyfx.com/market-news" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + } + + async with aiohttp.ClientSession() as session: + async with session.get( + url, headers=headers + ) as response: + text = await response.text() + + soup = BeautifulSoup(text, "html.parser") + + # Get market news and analysis + news = [] + articles = soup.find_all("article", class_="dfx-article") + for article in articles[:10]: # Get latest 10 articles + try: + news_item = { + "title": article.find("h3").text.strip(), + "summary": article.find("p").text.strip(), + "currency": article.get( + "data-currency", "General" + ), + "timestamp": article.find("time").get( + "datetime" + ), + } + news.append(news_item) + except: + continue + + return {"news": news} + except Exception as e: + logger.error(f"Error fetching DailyFX data: {e}") + return {} + + async def fetch_all_data(self) -> Dict: + """Fetch and combine all forex data sources""" + try: + # Fetch data from all sources concurrently + rates, ff_data, te_data, dx_data = await asyncio.gather( + self.fetch_ecb_rates(), + self.fetch_forex_factory_data(), + self.fetch_tradingeconomics_data(), + self.fetch_dailyfx_data(), + ) + + # Combine all data + market_data = { + "exchange_rates": rates, + "calendar": ff_data.get("calendar", []), + "economic_indicators": te_data.get("indicators", []), + "market_news": dx_data.get("news", []), + "timestamp": datetime.now().isoformat(), + } + + return market_data + + except Exception as e: + logger.error(f"Error fetching all data: {e}") + return {} + + +# Rest of the ForexForestSystem class remains the same... + +# (Previous ForexDataFeed class code remains the same...) + +# Specialized Agent Prompts +TECHNICAL_ANALYST_PROMPT = """You are an expert forex technical analyst agent. +Your responsibilities: +1. Analyze real-time exchange rate data for patterns and trends +2. Calculate cross-rates and currency correlations +3. Generate trading signals based on price action +4. Monitor market volatility and momentum +5. Identify key support and resistance levels + +Data Format: +- You will receive exchange rates from ECB and calculated cross-rates +- Focus on major currency pairs and their relationships +- Consider market volatility and trading volumes + +Output Format: +{ + "analysis_type": "technical", + "timestamp": "ISO timestamp", + "signals": [ + { + "pair": "Currency pair", + "trend": "bullish/bearish/neutral", + "strength": 1-10, + "key_levels": {"support": [], "resistance": []}, + "recommendation": "buy/sell/hold" + } + ] +}""" + +FUNDAMENTAL_ANALYST_PROMPT = """You are an expert forex fundamental analyst agent. +Your responsibilities: +1. Analyze economic calendar events and their impact +2. Evaluate economic indicators from Trading Economics +3. Assess market news and sentiment from DailyFX +4. Monitor central bank actions and policies +5. Track geopolitical events affecting currencies + +Data Format: +- Economic calendar events with impact levels +- Latest economic indicators and previous values +- Market news and analysis from reliable sources +- Central bank statements and policy changes + +Output Format: +{ + "analysis_type": "fundamental", + "timestamp": "ISO timestamp", + "assessments": [ + { + "currency": "Currency code", + "economic_outlook": "positive/negative/neutral", + "key_events": [], + "impact_score": 1-10, + "bias": "bullish/bearish/neutral" + } + ] +}""" + +MARKET_SENTIMENT_PROMPT = """You are an expert market sentiment analysis agent. +Your responsibilities: +1. Analyze news sentiment from DailyFX articles +2. Track market positioning and bias +3. Monitor risk sentiment and market fear/greed +4. Identify potential market drivers +5. Detect sentiment shifts and extremes + +Data Format: +- Market news and analysis articles +- Trading sentiment indicators +- Risk event calendar +- Market commentary and analysis + +Output Format: +{ + "analysis_type": "sentiment", + "timestamp": "ISO timestamp", + "sentiment_data": [ + { + "pair": "Currency pair", + "sentiment": "risk-on/risk-off", + "strength": 1-10, + "key_drivers": [], + "outlook": "positive/negative/neutral" + } + ] +}""" + +STRATEGY_COORDINATOR_PROMPT = """You are the lead forex strategy coordination agent. +Your responsibilities: +1. Synthesize technical, fundamental, and sentiment analysis +2. Generate final trading recommendations +3. Manage risk exposure and position sizing +4. Coordinate entry and exit points +5. Monitor open positions and adjust strategies + +Data Format: +- Analysis from technical, fundamental, and sentiment agents +- Current market rates and conditions +- Economic calendar and news events +- Risk parameters and exposure limits + +Output Format: +{ + "analysis_type": "strategy", + "timestamp": "ISO timestamp", + "recommendations": [ + { + "pair": "Currency pair", + "action": "buy/sell/hold", + "confidence": 1-10, + "entry_points": [], + "stop_loss": float, + "take_profit": float, + "rationale": "string" + } + ] +}""" + + +class ForexForestSystem: + """Main system coordinating the forest swarm and data feeds""" + + def __init__(self): + """Initialize the forex forest system""" + self.data_feed = ForexDataFeed() + + # Create Technical Analysis Tree + technical_agents = [ + TreeAgent( + system_prompt=TECHNICAL_ANALYST_PROMPT, + agent_name="Price Action Analyst", + model_name="gpt-4o", + ), + TreeAgent( + system_prompt=TECHNICAL_ANALYST_PROMPT, + agent_name="Cross Rate Analyst", + model_name="gpt-4o", + ), + TreeAgent( + system_prompt=TECHNICAL_ANALYST_PROMPT, + agent_name="Volatility Analyst", + model_name="gpt-4o", + ), + ] + + # Create Fundamental Analysis Tree + fundamental_agents = [ + TreeAgent( + system_prompt=FUNDAMENTAL_ANALYST_PROMPT, + agent_name="Economic Data Analyst", + model_name="gpt-4o", + ), + TreeAgent( + system_prompt=FUNDAMENTAL_ANALYST_PROMPT, + agent_name="News Impact Analyst", + model_name="gpt-4o", + ), + TreeAgent( + system_prompt=FUNDAMENTAL_ANALYST_PROMPT, + agent_name="Central Bank Analyst", + model_name="gpt-4o", + ), + ] + + # Create Sentiment Analysis Tree + sentiment_agents = [ + TreeAgent( + system_prompt=MARKET_SENTIMENT_PROMPT, + agent_name="News Sentiment Analyst", + model_name="gpt-4o", + ), + TreeAgent( + system_prompt=MARKET_SENTIMENT_PROMPT, + agent_name="Risk Sentiment Analyst", + model_name="gpt-4o", + ), + TreeAgent( + system_prompt=MARKET_SENTIMENT_PROMPT, + agent_name="Market Positioning Analyst", + model_name="gpt-4o", + ), + ] + + # Create Strategy Coordination Tree + strategy_agents = [ + TreeAgent( + system_prompt=STRATEGY_COORDINATOR_PROMPT, + agent_name="Lead Strategy Coordinator", + model_name="gpt-4", + temperature=0.5, + ), + TreeAgent( + system_prompt=STRATEGY_COORDINATOR_PROMPT, + agent_name="Risk Manager", + model_name="gpt-4", + temperature=0.5, + ), + TreeAgent( + system_prompt=STRATEGY_COORDINATOR_PROMPT, + agent_name="Position Manager", + model_name="gpt-4", + temperature=0.5, + ), + ] + + # Create trees + self.technical_tree = Tree( + tree_name="Technical Analysis", agents=technical_agents + ) + self.fundamental_tree = Tree( + tree_name="Fundamental Analysis", + agents=fundamental_agents, + ) + self.sentiment_tree = Tree( + tree_name="Sentiment Analysis", agents=sentiment_agents + ) + self.strategy_tree = Tree( + tree_name="Strategy Coordination", agents=strategy_agents + ) + + # Create forest swarm + self.forest = ForestSwarm( + trees=[ + self.technical_tree, + self.fundamental_tree, + self.sentiment_tree, + self.strategy_tree, + ] + ) + + logger.info("Forex Forest System initialized successfully") + + async def prepare_analysis_task(self) -> str: + """Prepare the analysis task with real-time data""" + try: + market_data = await self.data_feed.fetch_all_data() + + task = { + "action": "analyze_forex_markets", + "market_data": market_data, + "timestamp": datetime.now().isoformat(), + "analysis_required": [ + "technical", + "fundamental", + "sentiment", + "strategy", + ], + } + + return json.dumps(task, indent=2) + + except Exception as e: + logger.error(f"Error preparing analysis task: {e}") + raise + + async def run_analysis_cycle(self) -> Dict: + """Run a complete analysis cycle with the forest swarm""" + try: + # Prepare task with real-time data + task = await self.prepare_analysis_task() + + # Run forest swarm analysis + result = self.forest.run(task) + + # Parse and validate results + analysis = ( + json.loads(result) + if isinstance(result, str) + else result + ) + + logger.info("Analysis cycle completed successfully") + return analysis + + except Exception as e: + logger.error(f"Error in analysis cycle: {e}") + raise + + async def monitor_markets(self, interval_seconds: int = 300): + """Continuously monitor markets and run analysis""" + while True: + try: + # Run analysis cycle + analysis = await self.run_analysis_cycle() + + # Log results + logger.info("Market analysis completed") + logger.debug( + f"Analysis results: {json.dumps(analysis, indent=2)}" + ) + + # Process any trading signals + if "recommendations" in analysis: + await self.process_trading_signals( + analysis["recommendations"] + ) + + # Wait for next interval + await asyncio.sleep(interval_seconds) + + except Exception as e: + logger.error(f"Error in market monitoring: {e}") + await asyncio.sleep(60) + + async def process_trading_signals( + self, recommendations: List[Dict] + ): + """Process and log trading signals from analysis""" + try: + for rec in recommendations: + logger.info( + f"Trading Signal: {rec['pair']} - {rec['action']}" + ) + logger.info(f"Confidence: {rec['confidence']}/10") + logger.info(f"Entry Points: {rec['entry_points']}") + logger.info(f"Stop Loss: {rec['stop_loss']}") + logger.info(f"Take Profit: {rec['take_profit']}") + logger.info(f"Rationale: {rec['rationale']}") + logger.info("-" * 50) + + except Exception as e: + logger.error(f"Error processing trading signals: {e}") + + +# Example usage +async def main(): + """Main function to run the Forex Forest System""" + try: + system = ForexForestSystem() + await system.monitor_markets() + except Exception as e: + logger.error(f"Error in main: {e}") + + +if __name__ == "__main__": + # Set up asyncio event loop and run the system + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index fefed479..12d6acad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "6.5.7" +version = "6.6.8" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] @@ -63,10 +63,10 @@ asyncio = ">=3.4.3,<4.0" toml = "*" pypdf = "5.1.0" loguru = "*" -pydantic = "2.8.2" +pydantic = "*" tenacity = "*" psutil = "*" -sentry-sdk = {version = "*", extras = ["http"]} # Updated here +sentry-sdk = "*" python-dotenv = "*" PyYAML = "*" docstring_parser = "0.16" diff --git a/real_time.py b/real_time.py deleted file mode 100644 index fe55878d..00000000 --- a/real_time.py +++ /dev/null @@ -1,618 +0,0 @@ -import torch -from torch.utils.data import DataLoader, TensorDataset -import numpy as np -from loguru import logger - -from dataclasses import dataclass -from typing import Optional, Tuple, Dict -import math -import torch.nn as nn -import torch.nn.functional as F -from torch import Tensor - - -@dataclass -class TransformerConfig: - """Configuration class for MoE Transformer model parameters.""" - - vocab_size: int = 50257 - hidden_size: int = 768 - num_attention_heads: int = 12 - num_expert_layers: int = 4 - num_experts: int = 8 - expert_capacity: int = 32 - max_position_embeddings: int = 1024 - dropout_prob: float = 0.1 - layer_norm_epsilon: float = 1e-5 - initializer_range: float = 0.02 - num_query_groups: int = 4 # For multi-query attention - - -class ExpertLayer(nn.Module): - """Individual expert neural network.""" - - def __init__(self, config: TransformerConfig): - super().__init__() - self.fc1 = nn.Linear( - config.hidden_size, 4 * config.hidden_size - ) - self.fc2 = nn.Linear( - 4 * config.hidden_size, config.hidden_size - ) - self.activation = nn.GELU() - self.dropout = nn.Dropout(config.dropout_prob) - - def forward(self, x: Tensor) -> Tensor: - x = self.fc1(x) - x = self.activation(x) - x = self.dropout(x) - x = self.fc2(x) - return x - - -class MixtureOfExperts(nn.Module): - """Mixture of Experts layer with dynamic routing.""" - - def __init__(self, config: TransformerConfig): - super().__init__() - self.num_experts = config.num_experts - self.expert_capacity = config.expert_capacity - - # Create expert networks - self.experts = nn.ModuleList( - [ExpertLayer(config) for _ in range(config.num_experts)] - ) - - # Router network - self.router = nn.Linear( - config.hidden_size, config.num_experts - ) - - def forward(self, x: Tensor) -> Tuple[Tensor, Dict]: - """Route inputs to experts and combine outputs.""" - batch_size, seq_len, hidden_size = x.shape - - # Calculate routing probabilities - router_logits = self.router(x) - routing_weights = F.softmax(router_logits, dim=-1) - - # Select top-k experts - top_k = 2 - gates, indices = torch.topk(routing_weights, top_k, dim=-1) - gates = F.softmax(gates, dim=-1) - - # Process inputs through selected experts - final_output = torch.zeros_like(x) - router_load = torch.zeros(self.num_experts, device=x.device) - - for i in range(top_k): - expert_index = indices[..., i] - gate = gates[..., i : i + 1] - - # Count expert assignments - for j in range(self.num_experts): - router_load[j] += (expert_index == j).float().sum() - - # Process through selected experts - for j in range(self.num_experts): - mask = expert_index == j - if not mask.any(): - continue - - expert_input = x[mask] - expert_output = self.experts[j](expert_input) - final_output[mask] += gate[mask] * expert_output - - aux_loss = router_load.float().var() / ( - router_load.float().mean() ** 2 - ) - - return final_output, {"load_balancing_loss": aux_loss} - - -class MultiQueryAttention(nn.Module): - """Multi-Query Attention mechanism with proper multi-query group handling.""" - - def __init__(self, config: TransformerConfig): - super().__init__() - self.num_attention_heads = config.num_attention_heads - self.num_query_groups = config.num_query_groups - self.hidden_size = config.hidden_size - self.head_dim = ( - config.hidden_size // config.num_attention_heads - ) - - # Query projection maintains full head dimension - self.q_proj = nn.Linear( - config.hidden_size, config.hidden_size - ) - - # Key and value projections use reduced number of heads (query groups) - self.k_proj = nn.Linear( - config.hidden_size, - self.head_dim * config.num_query_groups, - ) - self.v_proj = nn.Linear( - config.hidden_size, - self.head_dim * config.num_query_groups, - ) - - self.dropout = nn.Dropout(config.dropout_prob) - - # Calculate heads per group for proper reshaping - self.heads_per_group = ( - self.num_attention_heads // self.num_query_groups - ) - - def forward( - self, - hidden_states: Tensor, - attention_mask: Optional[Tensor] = None, - cache: Optional[Dict[str, Tensor]] = None, - ) -> Tuple[Tensor, Optional[Dict[str, Tensor]]]: - batch_size, seq_length, _ = hidden_states.shape - - # Project queries, keys, and values - queries = self.q_proj(hidden_states) - keys = self.k_proj(hidden_states) - values = self.v_proj(hidden_states) - - # Reshape queries to full number of heads - queries = queries.view( - batch_size, - seq_length, - self.num_attention_heads, - self.head_dim, - ) - - # Reshape keys and values to number of query groups - keys = keys.view( - batch_size, - seq_length, - self.num_query_groups, - self.head_dim, - ) - values = values.view( - batch_size, - seq_length, - self.num_query_groups, - self.head_dim, - ) - - # Transpose for batch matrix multiplication - queries = queries.transpose( - 1, 2 - ) # (batch, n_heads, seq_len, head_dim) - keys = keys.transpose( - 1, 2 - ) # (batch, n_groups, seq_len, head_dim) - values = values.transpose( - 1, 2 - ) # (batch, n_groups, seq_len, head_dim) - - # Repeat keys and values for each head in the group - keys = keys.repeat_interleave(self.heads_per_group, dim=1) - values = values.repeat_interleave(self.heads_per_group, dim=1) - - # Compute attention scores - scale = 1.0 / math.sqrt(self.head_dim) - scores = torch.matmul(queries, keys.transpose(-2, -1)) * scale - - if attention_mask is not None: - # Expand attention mask to match scores dimensions - expanded_mask = attention_mask.unsqueeze(1).unsqueeze(2) - expanded_mask = expanded_mask.expand( - batch_size, - self.num_attention_heads, - seq_length, - seq_length, - ) - mask_value = torch.finfo(scores.dtype).min - attention_mask = expanded_mask.eq(0).float() * mask_value - scores = scores + attention_mask - - attention_weights = F.softmax(scores, dim=-1) - attention_weights = self.dropout(attention_weights) - - # Compute attention output - attention_output = torch.matmul(attention_weights, values) - attention_output = attention_output.transpose(1, 2) - attention_output = attention_output.reshape( - batch_size, seq_length, -1 - ) - - return attention_output, None - - -class MoETransformer(nn.Module): - """ - Production-grade Transformer model with Mixture of Experts and Multi-Query Attention. - - Features: - - Multi-Query Attention mechanism for efficient inference - - Mixture of Experts for dynamic routing and specialization - - Real-time weight updates based on input similarity - - Built-in logging and monitoring - - Type annotations for better code maintainability - """ - - def __init__(self, config: TransformerConfig): - super().__init__() - self.config = config - - # Initialize components - self.embedding = nn.Embedding( - config.vocab_size, config.hidden_size - ) - self.position_embedding = nn.Embedding( - config.max_position_embeddings, config.hidden_size - ) - - # Multi-Query Attention layers - self.attention_layers = nn.ModuleList( - [ - MultiQueryAttention(config) - for _ in range(config.num_expert_layers) - ] - ) - - # Mixture of Experts layers - self.moe_layers = nn.ModuleList( - [ - MixtureOfExperts(config) - for _ in range(config.num_expert_layers) - ] - ) - - # Layer normalization and dropout - self.layer_norm = nn.LayerNorm( - config.hidden_size, eps=config.layer_norm_epsilon - ) - self.dropout = nn.Dropout(config.dropout_prob) - - # Output projection - self.output_projection = nn.Linear( - config.hidden_size, config.vocab_size - ) - - # Initialize weights - self.apply(self._init_weights) - logger.info("Initialized MoETransformer model") - - def _init_weights(self, module: nn.Module): - """Initialize model weights.""" - if isinstance(module, (nn.Linear, nn.Embedding)): - module.weight.data.normal_( - mean=0.0, std=self.config.initializer_range - ) - if ( - isinstance(module, nn.Linear) - and module.bias is not None - ): - module.bias.data.zero_() - - def get_position_embeddings(self, position_ids: Tensor) -> Tensor: - """Generate position embeddings.""" - return self.position_embedding(position_ids) - - def forward( - self, - input_ids: Tensor, - attention_mask: Optional[Tensor] = None, - position_ids: Optional[Tensor] = None, - cache: Optional[Dict[str, Tensor]] = None, - ) -> Tuple[Tensor, Dict]: - """ - Forward pass through the model. - - Args: - input_ids: Input token IDs - attention_mask: Attention mask for padding - position_ids: Position IDs for positioning encoding - cache: Cache for key/value states in generation - - Returns: - tuple: (logits, auxiliary_outputs) - """ - batch_size, seq_length = input_ids.shape - - if position_ids is None: - position_ids = torch.arange( - seq_length, dtype=torch.long, device=input_ids.device - ) - position_ids = position_ids.unsqueeze(0).expand_as( - input_ids - ) - - # Get embeddings - inputs_embeds = self.embedding(input_ids) - position_embeds = self.get_position_embeddings(position_ids) - hidden_states = inputs_embeds + position_embeds - - # Initialize auxiliary outputs - aux_outputs = {"moe_losses": []} - - # Process through transformer layers - for attention_layer, moe_layer in zip( - self.attention_layers, self.moe_layers - ): - # Multi-Query Attention - attention_output, _ = attention_layer( - hidden_states, attention_mask, cache - ) - hidden_states = self.layer_norm( - hidden_states + attention_output - ) - - # Mixture of Experts - moe_output, moe_aux = moe_layer(hidden_states) - hidden_states = self.layer_norm( - hidden_states + moe_output - ) - aux_outputs["moe_losses"].append( - moe_aux["load_balancing_loss"] - ) - - # Final output projection - logits = self.output_projection(hidden_states) - - return logits, aux_outputs - - def fetch_loss( - self, - logits: Tensor, - labels: Tensor, - aux_outputs: Dict, - reduction: str = "mean", - ) -> Tensor: - """ - Calculate the total loss including MoE balancing losses. - - Args: - logits: Model output logits - labels: Ground truth labels - aux_outputs: Auxiliary outputs from forward pass - reduction: Loss reduction method - - Returns: - Tensor: Total loss - """ - # Calculate cross entropy loss - ce_loss = F.cross_entropy( - logits.view(-1, self.config.vocab_size), - labels.view(-1), - reduction=reduction, - ) - - # Calculate MoE loss - moe_loss = torch.stack(aux_outputs["moe_losses"]).mean() - - # Combine losses - total_loss = ce_loss + 0.01 * moe_loss - - logger.debug( - f"CE Loss: {ce_loss.item():.4f}, " - f"MoE Loss: {moe_loss.item():.4f}" - ) - - return total_loss - - @torch.no_grad() - def generate( - self, - input_ids: Tensor, - max_length: int = 100, - temperature: float = 1.0, - top_k: int = 50, - top_p: float = 0.9, - ) -> Tensor: - """ - Generate text using the model. - - Args: - input_ids: Initial input tokens - max_length: Maximum sequence length to generate - temperature: Sampling temperature - top_k: Number of highest probability tokens to keep - top_p: Cumulative probability for nucleus sampling - - Returns: - Tensor: Generated token IDs - """ - batch_size = input_ids.shape[0] - device = input_ids.device - - # Initialize sequence with input_ids - generated = input_ids - - # Cache for key-value pairs - cache = {} - - for _ in range(max_length): - # Get position IDs for current sequence - position_ids = torch.arange( - generated.shape[1], dtype=torch.long, device=device - ) - position_ids = position_ids.unsqueeze(0).expand( - batch_size, -1 - ) - - # Forward pass - logits, _ = self.forward( - generated, position_ids=position_ids, cache=cache - ) - - # Get next token logits - next_token_logits = logits[:, -1, :] / temperature - - # Apply top-k filtering - if top_k > 0: - indices_to_remove = ( - next_token_logits - < torch.topk(next_token_logits, top_k)[0][ - ..., -1, None - ] - ) - next_token_logits[indices_to_remove] = float("-inf") - - # Apply top-p (nucleus) filtering - if top_p < 1.0: - sorted_logits, sorted_indices = torch.sort( - next_token_logits, descending=True - ) - cumulative_probs = torch.cumsum( - F.softmax(sorted_logits, dim=-1), dim=-1 - ) - - # Remove tokens with cumulative probability above the threshold - sorted_indices_to_remove = cumulative_probs > top_p - sorted_indices_to_remove[..., 1:] = ( - sorted_indices_to_remove[..., :-1].clone() - ) - sorted_indices_to_remove[..., 0] = 0 - - indices_to_remove = sorted_indices[ - sorted_indices_to_remove - ] - next_token_logits[indices_to_remove] = float("-inf") - - # Sample next token - probs = F.softmax(next_token_logits, dim=-1) - next_token = torch.multinomial(probs, num_samples=1) - - # Append next token to sequence - generated = torch.cat((generated, next_token), dim=1) - - # Check for end of sequence token - if (next_token == self.config.vocab_size - 1).all(): - break - - return generated - - -# Initialize model configuration -config = TransformerConfig( - vocab_size=50257, - hidden_size=768, - num_attention_heads=12, - num_expert_layers=4, - num_experts=8, - expert_capacity=32, - max_position_embeddings=1024, - num_query_groups=4, -) - - -def prepare_sample_data( - batch_size: int = 8, - seq_length: int = 512, - vocab_size: int = 50257, -) -> DataLoader: - """Create sample data for demonstration.""" - # Create random input sequences - input_ids = torch.randint( - 0, vocab_size, (100, seq_length) # 100 samples - ) - - # Create target sequences (shifted by 1) - labels = torch.randint(0, vocab_size, (100, seq_length)) - - # Create attention masks (1 for real tokens, 0 for padding) - attention_mask = torch.ones_like(input_ids) - - # Create dataset and dataloader - dataset = TensorDataset(input_ids, attention_mask, labels) - dataloader = DataLoader( - dataset, batch_size=batch_size, shuffle=True - ) - - return dataloader - - -def train_step( - model: MoETransformer, - batch: tuple, - optimizer: torch.optim.Optimizer, - device: str = "cuda" if torch.cuda.is_available() else "cpu", -) -> float: - """Execute single training step.""" - model.train() - optimizer.zero_grad() - - # Unpack batch - input_ids, attention_mask, labels = [b.to(device) for b in batch] - - # Forward pass - logits, aux_outputs = model( - input_ids=input_ids, attention_mask=attention_mask - ) - - # Calculate loss - loss = model.fetch_loss(logits, labels, aux_outputs) - - # Backward pass - loss.backward() - optimizer.step() - - return loss.item() - - -def main(): - # Set device - device = "cuda" if torch.cuda.is_available() else "cpu" - logger.info(f"Using device: {device}") - - # Initialize model - model = MoETransformer(config).to(device) - logger.info("Model initialized") - - # Setup optimizer - optimizer = torch.optim.AdamW( - model.parameters(), lr=1e-4, weight_decay=0.01 - ) - - # Prepare data - dataloader = prepare_sample_data() - logger.info("Data prepared") - - # Training loop - num_epochs = 3 - for epoch in range(num_epochs): - epoch_losses = [] - - for batch_idx, batch in enumerate(dataloader): - loss = train_step(model, batch, optimizer, device) - epoch_losses.append(loss) - - if batch_idx % 10 == 0: - logger.info( - f"Epoch {epoch+1}/{num_epochs} " - f"Batch {batch_idx}/{len(dataloader)} " - f"Loss: {loss:.4f}" - ) - - avg_loss = np.mean(epoch_losses) - logger.info(f"Epoch {epoch+1} average loss: {avg_loss:.4f}") - - # Generation example - model.eval() - with torch.no_grad(): - # Prepare input prompt - prompt = torch.randint(0, config.vocab_size, (1, 10)).to( - device - ) - - # Generate sequence - generated = model.generate( - input_ids=prompt, - max_length=50, - temperature=0.7, - top_k=50, - top_p=0.9, - ) - - logger.info(f"Generated sequence shape: {generated.shape}") - - -if __name__ == "__main__": - main() diff --git a/swarm_builder.py b/swarm_builder.py new file mode 100644 index 00000000..f1d769b4 --- /dev/null +++ b/swarm_builder.py @@ -0,0 +1,333 @@ +import os +from typing import List, Optional +from datetime import datetime + +from pydantic import BaseModel, Field +from pydantic.v1 import validator +from loguru import logger +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, +) + +from swarm_models import OpenAIFunctionCaller, OpenAIChat +from swarms.structs.agent import Agent +from swarms.structs.swarm_router import SwarmRouter +from swarms.structs.agents_available import showcase_available_agents + + +BOSS_SYSTEM_PROMPT = """ +Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective. + +### Instructions: + +1. **Task Assignment**: + - Analyze available worker agents when a task is presented. + - Delegate tasks to existing agents with clear, direct, and actionable instructions if an appropriate agent is available. + - If no suitable agent exists, create a new agent with a fitting system prompt to handle the task. + +2. **Agent Creation**: + - Name agents according to the task they are intended to perform (e.g., "Twitter Marketing Agent"). + - Provide each new agent with a concise and clear system prompt that includes its role, objectives, and any tools it can utilize. + +3. **Efficiency**: + - Minimize redundancy and maximize task completion speed. + - Avoid unnecessary agent creation if an existing agent can fulfill the task. + +4. **Communication**: + - Be explicit in task delegation instructions to avoid ambiguity and ensure effective task execution. + - Require agents to report back on task completion or encountered issues. + +5. **Reasoning and Decisions**: + - Offer brief reasoning when selecting or creating agents to maintain transparency. + - Avoid using an agent if unnecessary, with a clear explanation if no agents are suitable for a task. + +# Output Format + +Present your plan in clear, bullet-point format or short concise paragraphs, outlining task assignment, agent creation, efficiency strategies, and communication protocols. + +# Notes + +- Preserve transparency by always providing reasoning for task-agent assignments and creation. +- Ensure instructions to agents are unambiguous to minimize error. + +""" + + +class AgentConfig(BaseModel): + """Configuration for an individual agent in a swarm""" + + name: str = Field( + description="The name of the agent", example="Research-Agent" + ) + description: str = Field( + description="A description of the agent's purpose and capabilities", + example="Agent responsible for researching and gathering information", + ) + system_prompt: str = Field( + description="The system prompt that defines the agent's behavior", + example="You are a research agent. Your role is to gather and analyze information...", + ) + + @validator("name") + def validate_name(cls, v): + if not v.strip(): + raise ValueError("Agent name cannot be empty") + return v.strip() + + @validator("system_prompt") + def validate_system_prompt(cls, v): + if not v.strip(): + raise ValueError("System prompt cannot be empty") + return v.strip() + + +class SwarmConfig(BaseModel): + """Configuration for a swarm of cooperative agents""" + + name: str = Field( + description="The name of the swarm", + example="Research-Writing-Swarm", + ) + description: str = Field( + description="The description of the swarm's purpose and capabilities", + example="A swarm of agents that work together to research topics and write articles", + ) + agents: List[AgentConfig] = Field( + description="The list of agents that make up the swarm", + min_items=1, + ) + + @validator("agents") + def validate_agents(cls, v): + if not v: + raise ValueError("Swarm must have at least one agent") + return v + + +class AutoSwarmBuilder: + """A class that automatically builds and manages swarms of AI agents with enhanced error handling.""" + + def __init__( + self, + name: Optional[str] = None, + description: Optional[str] = None, + verbose: bool = True, + api_key: Optional[str] = None, + model_name: str = "gpt-4", + ): + self.name = name or "DefaultSwarm" + self.description = description or "Generic AI Agent Swarm" + self.verbose = verbose + self.agents_pool = [] + self.api_key = api_key or os.getenv("OPENAI_API_KEY") + self.model_name = model_name + + if not self.api_key: + raise ValueError( + "OpenAI API key must be provided either through initialization or environment variable" + ) + + logger.info( + "Initialized AutoSwarmBuilder", + extra={ + "swarm_name": self.name, + "description": self.description, + "model": self.model_name, + }, + ) + + # Initialize OpenAI chat model + try: + self.chat_model = OpenAIChat( + openai_api_key=self.api_key, + model_name=self.model_name, + temperature=0.1, + ) + except Exception as e: + logger.error( + f"Failed to initialize OpenAI chat model: {str(e)}" + ) + raise + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + ) + def run(self, task: str, image_url: Optional[str] = None) -> str: + """Run the swarm on a given task with error handling and retries.""" + if not task or not task.strip(): + raise ValueError("Task cannot be empty") + + logger.info("Starting swarm execution", extra={"task": task}) + + try: + # Create agents for the task + agents = self._create_agents(task, image_url) + if not agents: + raise ValueError( + "No agents were created for the task" + ) + + # Execute the task through the swarm router + logger.info( + "Routing task through swarm", + extra={"num_agents": len(agents)}, + ) + output = self.swarm_router(agents, task, image_url) + + logger.info("Swarm execution completed successfully") + return output + + except Exception as e: + logger.error( + f"Error during swarm execution: {str(e)}", + exc_info=True, + ) + raise + + def _create_agents( + self, task: str, image_url: Optional[str] = None + ) -> List[Agent]: + """Create the necessary agents for a task with enhanced error handling.""" + logger.info("Creating agents for task", extra={"task": task}) + + try: + model = OpenAIFunctionCaller( + system_prompt=BOSS_SYSTEM_PROMPT, + api_key=self.api_key, + temperature=0.1, + base_model=SwarmConfig, + ) + + agents_config = model.run(task) + print(f"{agents_config}") + + if isinstance(agents_config, dict): + agents_config = SwarmConfig(**agents_config) + + # Update swarm configuration + self.name = agents_config.name + self.description = agents_config.description + + # Create agents from configuration + agents = [] + for agent_config in agents_config.agents: + if isinstance(agent_config, dict): + agent_config = AgentConfig(**agent_config) + + agent = self.build_agent( + agent_name=agent_config.name, + agent_description=agent_config.description, + agent_system_prompt=agent_config.system_prompt, + ) + agents.append(agent) + + # Add available agents showcase to system prompts + agents_available = showcase_available_agents( + name=self.name, + description=self.description, + agents=agents, + ) + + for agent in agents: + agent.system_prompt += "\n" + agents_available + + logger.info( + "Successfully created agents", + extra={"num_agents": len(agents)}, + ) + return agents + + except Exception as e: + logger.error( + f"Error creating agents: {str(e)}", exc_info=True + ) + raise + + def build_agent( + self, + agent_name: str, + agent_description: str, + agent_system_prompt: str, + ) -> Agent: + """Build a single agent with enhanced error handling.""" + logger.info( + "Building agent", extra={"agent_name": agent_name} + ) + + try: + agent = Agent( + agent_name=agent_name, + description=agent_description, + system_prompt=agent_system_prompt, + llm=self.chat_model, + autosave=True, + dashboard=False, + verbose=self.verbose, + dynamic_temperature_enabled=True, + saved_state_path=f"states/{agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", + user_name="swarms_corp", + retry_attempts=3, + context_length=200000, + return_step_meta=False, + output_type="str", + streaming_on=False, + auto_generate_prompt=True, + ) + return agent + + except Exception as e: + logger.error( + f"Error building agent: {str(e)}", exc_info=True + ) + raise + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + ) + def swarm_router( + self, + agents: List[Agent], + task: str, + image_url: Optional[str] = None, + ) -> str: + """Route tasks between agents in the swarm with error handling and retries.""" + logger.info( + "Initializing swarm router", + extra={"num_agents": len(agents)}, + ) + + try: + swarm_router_instance = SwarmRouter( + name=self.name, + description=self.description, + agents=agents, + swarm_type="auto", + ) + + formatted_task = f"{self.name} {self.description} {task}" + result = swarm_router_instance.run(formatted_task) + + logger.info("Successfully completed swarm routing") + return result + + except Exception as e: + logger.error( + f"Error in swarm router: {str(e)}", exc_info=True + ) + raise + + +swarm = AutoSwarmBuilder( + name="ChipDesign-Swarm", + description="A swarm of specialized AI agents for chip design", + api_key="your-api-key", # Optional if set in environment + model_name="gpt-4", # Optional, defaults to gpt-4 +) + +result = swarm.run( + "Design a new AI accelerator chip optimized for transformer model inference..." +) diff --git a/swarm_router_example.py b/swarm_router_example.py deleted file mode 100644 index d7397457..00000000 --- a/swarm_router_example.py +++ /dev/null @@ -1,165 +0,0 @@ - -from swarms import Agent, SwarmRouter - -# Portfolio Analysis Specialist -portfolio_analyzer = Agent( - agent_name="Portfolio-Analysis-Specialist", - system_prompt="""You are an expert portfolio analyst specializing in fund analysis and selection. Your core competencies include: - - Comprehensive analysis of mutual funds, ETFs, and index funds - - Evaluation of fund performance metrics (expense ratios, tracking error, Sharpe ratio) - - Assessment of fund composition and strategy alignment - - Risk-adjusted return analysis - - Tax efficiency considerations - - For each portfolio analysis: - 1. Evaluate fund characteristics and performance metrics - 2. Analyze expense ratios and fee structures - 3. Assess historical performance and volatility - 4. Compare funds within same category - 5. Consider tax implications - 6. Review fund manager track record and strategy consistency - - Maintain focus on cost-efficiency and alignment with investment objectives.""", - model_name="gpt-4o", - max_loops=1, - saved_state_path="portfolio_analyzer.json", - user_name="investment_team", - retry_attempts=2, - context_length=200000, - output_type="string", -) - -# Asset Allocation Strategist -allocation_strategist = Agent( - agent_name="Asset-Allocation-Strategist", - system_prompt="""You are a specialized asset allocation strategist focused on portfolio construction and optimization. Your expertise includes: - - Strategic and tactical asset allocation - - Risk tolerance assessment and portfolio matching - - Geographic and sector diversification - - Rebalancing strategy development - - Portfolio optimization using modern portfolio theory - - For each allocation: - 1. Analyze investor risk tolerance and objectives - 2. Develop appropriate asset class weights - 3. Select optimal fund combinations - 4. Design rebalancing triggers and schedules - 5. Consider tax-efficient fund placement - 6. Account for correlation between assets - - Focus on creating well-diversified portfolios aligned with client goals and risk tolerance.""", - model_name="gpt-4o", - max_loops=1, - saved_state_path="allocation_strategist.json", - user_name="investment_team", - retry_attempts=2, - context_length=200000, - output_type="string", -) - -# Risk Management Specialist -risk_manager = Agent( - agent_name="Risk-Management-Specialist", - system_prompt="""You are a risk management specialist focused on portfolio risk assessment and mitigation. Your expertise covers: - - Portfolio risk metrics analysis - - Downside protection strategies - - Correlation analysis between funds - - Stress testing and scenario analysis - - Market condition impact assessment - - For each portfolio: - 1. Calculate key risk metrics (Beta, Standard Deviation, etc.) - 2. Analyze correlation matrices - 3. Perform stress tests under various scenarios - 4. Evaluate liquidity risks - 5. Assess concentration risks - 6. Monitor factor exposures - - Focus on maintaining appropriate risk levels while maximizing risk-adjusted returns.""", - model_name="gpt-4o", - max_loops=1, - saved_state_path="risk_manager.json", - user_name="investment_team", - retry_attempts=2, - context_length=200000, - output_type="string", -) - -# Portfolio Implementation Specialist -implementation_specialist = Agent( - agent_name="Portfolio-Implementation-Specialist", - system_prompt="""You are a portfolio implementation specialist focused on efficient execution and maintenance. Your responsibilities include: - - Fund selection for specific asset class exposure - - Tax-efficient implementation strategies - - Portfolio rebalancing execution - - Trading cost analysis - - Cash flow management - - For each implementation: - 1. Select most efficient funds for desired exposure - 2. Plan tax-efficient transitions - 3. Design rebalancing schedule - 4. Optimize trade execution - 5. Manage cash positions - 6. Monitor tracking error - - Maintain focus on minimizing costs and maximizing tax efficiency during implementation.""", - model_name="gpt-4o", - max_loops=1, - saved_state_path="implementation_specialist.json", - user_name="investment_team", - retry_attempts=2, - context_length=200000, - output_type="string", -) - -# Portfolio Monitoring Specialist -monitoring_specialist = Agent( - agent_name="Portfolio-Monitoring-Specialist", - system_prompt="""You are a portfolio monitoring specialist focused on ongoing portfolio oversight and optimization. Your expertise includes: - - Regular portfolio performance review - - Drift monitoring and rebalancing triggers - - Fund changes and replacements - - Tax loss harvesting opportunities - - Performance attribution analysis - - For each review: - 1. Track portfolio drift from targets - 2. Monitor fund performance and changes - 3. Identify tax loss harvesting opportunities - 4. Analyze tracking error and expenses - 5. Review risk metrics evolution - 6. Generate performance attribution reports - - Ensure continuous alignment with investment objectives while maintaining optimal portfolio efficiency.""", - model_name="gpt-4o", - max_loops=1, - saved_state_path="monitoring_specialist.json", - user_name="investment_team", - retry_attempts=2, - context_length=200000, - output_type="string", -) - -# List of all agents for portfolio management -portfolio_agents = [ - portfolio_analyzer, - allocation_strategist, - risk_manager, - implementation_specialist, - monitoring_specialist -] - - -# Router -router = SwarmRouter( - name = "etf-portfolio-management-swarm", - description="Creates and suggests an optimal portfolio", - agents = portfolio_agents, - swarm_type="SequentialWorkflow", # ConcurrentWorkflow - max_loops = 1, -) - -router.run( - task = "I have 10,000$ and I want to create a porfolio based on energy, ai, and datacenter companies. high growth." -) \ No newline at end of file diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index d6caed66..b9df9157 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1,13 +1,14 @@ +from datetime import datetime import asyncio import json import logging import os import random +import sys import threading import time import uuid from concurrent.futures import ThreadPoolExecutor -from datetime import datetime from typing import ( Any, Callable, @@ -21,12 +22,9 @@ from typing import ( import toml import yaml -from loguru import logger from pydantic import BaseModel from swarm_models.tiktoken_wrapper import TikTokenizer - from swarms.agents.ape_agent import auto_generate_prompt -from swarms.artifacts.main_artifact import Artifact from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3 from swarms.prompts.multi_modal_autonomous_instruction_prompt import ( MULTI_MODAL_AUTO_AGENT_SYSTEM_PROMPT_1, @@ -40,19 +38,22 @@ from swarms.schemas.base_schemas import ( ) from swarms.structs.concat import concat_strings from swarms.structs.conversation import Conversation -from swarms.structs.safe_loading import ( - SafeLoaderUtils, - SafeStateManager, -) from swarms.tools.base_tool import BaseTool +from swarms.tools.func_calling_utils import ( + prepare_output_for_output_model, +) from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.utils.data_to_text import data_to_text from swarms.utils.file_processing import create_file_in_folder -from swarms.utils.formatter import formatter from swarms.utils.pdf_to_text import pdf_to_text +from swarms.artifacts.main_artifact import Artifact +from swarms.utils.loguru_logger import initialize_logger from swarms.utils.wrapper_clusterop import ( exec_callable_with_clusterops, ) +from swarms.utils.formatter import formatter + +logger = initialize_logger(log_folder="agents") # Utils @@ -135,6 +136,7 @@ class Agent: callback (Callable): The callback function metadata (Dict[str, Any]): The metadata callbacks (List[Callable]): The list of callback functions + logger_handler (Any): The logger handler search_algorithm (Callable): The search algorithm logs_to_filename (str): The filename for the logs evaluator (Callable): The evaluator function @@ -269,6 +271,7 @@ class Agent: callback: Optional[Callable] = None, metadata: Optional[Dict[str, Any]] = None, callbacks: Optional[List[Callable]] = None, + logger_handler: Optional[Any] = sys.stderr, search_algorithm: Optional[Callable] = None, logs_to_filename: Optional[str] = None, evaluator: Optional[Callable] = None, # Custom LLM or agent @@ -294,6 +297,7 @@ class Agent: algorithm_of_thoughts: bool = False, tree_of_thoughts: bool = False, tool_choice: str = "auto", + execute_tool: bool = False, rules: str = None, # type: ignore planning: Optional[str] = False, planning_prompt: Optional[str] = None, @@ -315,7 +319,7 @@ class Agent: use_cases: Optional[List[Dict[str, str]]] = None, step_pool: List[Step] = [], print_every_step: Optional[bool] = False, - time_created: Optional[str] = time.strftime( + time_created: Optional[float] = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime() ), agent_output: ManySteps = None, @@ -336,7 +340,6 @@ class Agent: all_gpus: bool = False, model_name: str = None, llm_args: dict = None, - load_state_path: str = None, *args, **kwargs, ): @@ -387,6 +390,7 @@ class Agent: self.callback = callback self.metadata = metadata self.callbacks = callbacks + self.logger_handler = logger_handler self.search_algorithm = search_algorithm self.logs_to_filename = logs_to_filename self.evaluator = evaluator @@ -410,6 +414,7 @@ class Agent: self.algorithm_of_thoughts = algorithm_of_thoughts self.tree_of_thoughts = tree_of_thoughts self.tool_choice = tool_choice + self.execute_tool = execute_tool self.planning = planning self.planning_prompt = planning_prompt self.custom_planning_prompt = custom_planning_prompt @@ -452,7 +457,6 @@ class Agent: self.all_gpus = all_gpus self.model_name = model_name self.llm_args = llm_args - self.load_state_path = load_state_path # Initialize the short term memory self.short_memory = Conversation( @@ -498,6 +502,10 @@ class Agent: if preset_stopping_token is not None: self.stopping_token = "" + # # Check the parameters + # # Telemetry Processor to log agent data + # threading.Thread(target=self.agent_initialization()).start + # If the docs exist then ingest the docs if exists(self.docs): threading.Thread( @@ -538,6 +546,19 @@ class Agent: tool.__name__: tool for tool in tools } + # Set the logger handler + if exists(logger_handler): + log_file_path = os.path.join( + self.workspace_dir, f"{self.agent_name}.log" + ) + logger.add( + log_file_path, + level="INFO", + colorize=True, + backtrace=True, + diagnose=True, + ) + # If the tool schema exists or a list of base models exists then convert the tool schema into an openai schema if exists(tool_schema) or exists(list_base_models): threading.Thread( @@ -572,23 +593,20 @@ class Agent: # Telemetry Processor to log agent data threading.Thread(target=self.log_agent_data).start() - if self.llm is None and self.model_name is not None: - self.llm = self.llm_handling() + threading.Thread(target=self.llm_handling()) def llm_handling(self): - from swarms.utils.litellm_wrapper import LiteLLM - if self.llm_args is not None: - llm = LiteLLM(model_name=self.model_name, **self.llm_args) + if self.llm is None: + from swarms.utils.litellm_wrapper import LiteLLM - else: - llm = LiteLLM( - model_name=self.model_name, - temperature=self.temperature, - max_tokens=self.max_tokens, - ) + if self.llm_args is not None: + self.llm = LiteLLM( + model_name=self.model_name, **self.llm_args + ) - return llm + else: + self.llm = LiteLLM(model_name=self.model_name) def check_if_no_prompt_then_autogenerate(self, task: str = None): """ @@ -802,9 +820,6 @@ class Agent: # Print the user's request - if self.autosave: - self.save() - # Print the request if print_task is True: formatter.print_panel( @@ -889,6 +904,13 @@ class Agent: # Check and execute tools if self.tools is not None: self.parse_and_execute_tools(response) + # if tool_result: + # self.update_tool_usage( + # step_meta["step_id"], + # tool_result["tool"], + # tool_result["args"], + # tool_result["response"], + # ) # Add the response to the memory self.short_memory.add( @@ -922,12 +944,6 @@ class Agent: success = True # Mark as successful to exit the retry loop except Exception as e: - - self.log_agent_data() - - if self.autosave is True: - self.save() - logger.error( f"Attempt {attempt+1}: Error generating" f" response: {e}" @@ -935,12 +951,6 @@ class Agent: attempt += 1 if not success: - - self.log_agent_data() - - if self.autosave is True: - self.save() - logger.error( "Failed to generate a valid response after" " retry attempts." @@ -984,10 +994,8 @@ class Agent: time.sleep(self.loop_interval) if self.autosave is True: - self.log_agent_data() - - if self.autosave is True: - self.save() + logger.info("Autosaving agent state.") + self.save_state() # Apply the cleaner function to the response if self.output_cleaner is not None: @@ -1029,9 +1037,10 @@ class Agent: self.artifacts_file_extension, ) - self.log_agent_data() - if self.autosave is True: - self.save() + try: + self.log_agent_data() + except Exception: + pass # More flexible output types if ( @@ -1041,10 +1050,7 @@ class Agent: return concat_strings(all_responses) elif self.output_type == "list": return all_responses - elif ( - self.output_type == "json" - or self.return_step_meta is True - ): + elif self.output_type == "json": return self.agent_output.model_dump_json(indent=4) elif self.output_type == "csv": return self.dict_to_csv( @@ -1056,6 +1062,8 @@ class Agent: return yaml.safe_dump( self.agent_output.model_dump(), sort_keys=False ) + elif self.return_step_meta is True: + return self.agent_output.model_dump_json(indent=4) elif self.return_history is True: history = self.short_memory.get_str() @@ -1069,74 +1077,18 @@ class Agent: ) except Exception as error: - self._handle_run_error(error) + self.log_agent_data() + logger.info( + f"Error running agent: {error} optimize your input parameters" + ) + raise error except KeyboardInterrupt as error: - self._handle_run_error(error) - - def _handle_run_error(self, error: any): - self.log_agent_data() - - if self.autosave is True: - self.save() - - logger.info( - f"Error detected running your agent {self.agent_name} \n Error {error} \n Optimize your input parameters and or add an issue on the swarms github and contact our team on discord for support ;) " - ) - raise error - - async def arun( - self, - task: Optional[str] = None, - img: Optional[str] = None, - is_last: bool = False, - device: str = "cpu", # gpu - device_id: int = 1, - all_cores: bool = True, - do_not_use_cluster_ops: bool = True, - all_gpus: bool = False, - *args, - **kwargs, - ) -> Any: - """ - Asynchronously runs the agent with the specified parameters. - - Args: - task (Optional[str]): The task to be performed. Defaults to None. - img (Optional[str]): The image to be processed. Defaults to None. - is_last (bool): Indicates if this is the last task. Defaults to False. - device (str): The device to use for execution. Defaults to "cpu". - device_id (int): The ID of the GPU to use if device is set to "gpu". Defaults to 1. - all_cores (bool): If True, uses all available CPU cores. Defaults to True. - do_not_use_cluster_ops (bool): If True, does not use cluster operations. Defaults to True. - all_gpus (bool): If True, uses all available GPUs. Defaults to False. - *args: Additional positional arguments. - **kwargs: Additional keyword arguments. - - Returns: - Any: The result of the asynchronous operation. - - Raises: - Exception: If an error occurs during the asynchronous operation. - """ - try: - return await asyncio.to_thread( - self.run, - task=task, - img=img, - is_last=is_last, - device=device, - device_id=device_id, - all_cores=all_cores, - do_not_use_cluster_ops=do_not_use_cluster_ops, - all_gpus=all_gpus, - *args, - **kwargs, + self.log_agent_data() + logger.info( + f"Error running agent: {error} optimize your input parameters" ) - except Exception as error: - await self._handle_run_error( - error - ) # Ensure this is also async if needed + raise error def __call__( self, @@ -1144,10 +1096,8 @@ class Agent: img: Optional[str] = None, is_last: bool = False, device: str = "cpu", # gpu - device_id: int = 1, + device_id: int = 0, all_cores: bool = True, - do_not_use_cluster_ops: bool = True, - all_gpus: bool = False, *args, **kwargs, ) -> Any: @@ -1162,19 +1112,33 @@ class Agent: all_cores (bool): If True, uses all available CPU cores. Defaults to True. """ try: - return self.run( - task=task, - img=img, - is_last=is_last, - device=device, - device_id=device_id, - all_cores=all_cores, - do_not_use_cluster_ops=do_not_use_cluster_ops, - all_gpus=all_gpus * args, - **kwargs, - ) + if task is not None: + return self.run( + task=task, + is_last=is_last, + device=device, + device_id=device_id, + all_cores=all_cores, + *args, + **kwargs, + ) + elif img is not None: + return self.run( + img=img, + is_last=is_last, + device=device, + device_id=device_id, + all_cores=all_cores, + *args, + **kwargs, + ) + else: + raise ValueError( + "Either 'task' or 'img' must be provided." + ) except Exception as error: - self._handle_run_error(error) + logger.error(f"Error calling agent: {error}") + raise error def dict_to_csv(self, data: dict) -> str: """ @@ -1201,31 +1165,33 @@ class Agent: return output.getvalue() def parse_and_execute_tools(self, response: str, *args, **kwargs): - try: - logger.info("Executing tool...") - - # try to Execute the tool and return a string - out = parse_and_execute_json( - functions=self.tools, - json_string=response, - parse_md=True, - *args, - **kwargs, - ) + # Try executing the tool + if self.execute_tool is not False: + try: + logger.info("Executing tool...") - out = str(out) + # try to Execute the tool and return a string + out = parse_and_execute_json( + self.tools, + response, + parse_md=True, + *args, + **kwargs, + ) - logger.info(f"Tool Output: {out}") + out = str(out) - # Add the output to the memory - self.short_memory.add( - role="Tool Executor", - content=out, - ) + logger.info(f"Tool Output: {out}") - except Exception as error: - logger.error(f"Error executing tool: {error}") - raise error + # Add the output to the memory + self.short_memory.add( + role="Tool Executor", + content=out, + ) + + except Exception as error: + logger.error(f"Error executing tool: {error}") + raise error def add_memory(self, message: str): """Add a memory to the agent @@ -1237,7 +1203,6 @@ class Agent: _type_: _description_ """ logger.info(f"Adding memory: {message}") - return self.short_memory.add( role=self.agent_name, content=message ) @@ -1296,9 +1261,7 @@ class Agent: try: logger.info(f"Running concurrent tasks: {tasks}") futures = [ - self.executor.submit( - self.run, task=task, *args, **kwargs - ) + self.executor.submit(self.run, task, *args, **kwargs) for task in tasks ] results = [future.result() for future in futures] @@ -1326,345 +1289,94 @@ class Agent: except Exception as error: logger.info(f"Error running bulk run: {error}", "red") - async def arun_batched( - self, - tasks: List[str], - *args, - **kwargs, - ): - """Asynchronously runs a batch of tasks.""" + def save(self) -> None: + """Save the agent history to a file. + + Args: + file_path (_type_): _description_ + """ + file_path = ( + f"{self.saved_state_path}.json" + or f"{self.agent_name}.json" + or f"{self.saved_state_path}.json" + ) try: - # Create a list of coroutines for each task - coroutines = [ - self.arun(task=task, *args, **kwargs) - for task in tasks - ] - # Use asyncio.gather to run them concurrently - results = await asyncio.gather(*coroutines) - return results + create_file_in_folder( + self.workspace_dir, + file_path, + self.to_json(), + ) + logger.info(f"Saved agent history to: {file_path}") except Exception as error: - logger.error(f"Error running batched tasks: {error}") - raise + logger.error(f"Error saving agent history: {error}") + raise error - def save(self, file_path: str = None) -> None: + def load(self, file_path: str) -> None: """ - Save the agent state to a file using SafeStateManager with atomic writing - and backup functionality. Automatically handles complex objects and class instances. + Load the agent history from a file, excluding the LLM. Args: - file_path (str, optional): Custom path to save the state. - If None, uses configured paths. + file_path (str): The path to the file containing the saved agent history. Raises: - OSError: If there are filesystem-related errors + FileNotFoundError: If the specified file path does not exist + json.JSONDecodeError: If the file contains invalid JSON + AttributeError: If there are issues setting agent attributes Exception: For other unexpected errors """ try: - # Determine the save path - resolved_path = ( - file_path - or self.saved_state_path - or f"{self.agent_name}_state.json" + file_path = ( + f"{self.saved_state_path}.json" + or f"{self.agent_name}.json" + or f"{self.saved_state_path}.json" ) - # Ensure path has .json extension - if not resolved_path.endswith(".json"): - resolved_path += ".json" - - # Create full path including workspace directory - full_path = os.path.join( - self.workspace_dir, resolved_path - ) - backup_path = full_path + ".backup" - temp_path = full_path + ".temp" - - # Ensure workspace directory exists - os.makedirs(os.path.dirname(full_path), exist_ok=True) - - # First save to temporary file using SafeStateManager - SafeStateManager.save_state(self, temp_path) - - # If current file exists, create backup - if os.path.exists(full_path): - try: - os.replace(full_path, backup_path) - except Exception as e: - logger.warning(f"Could not create backup: {e}") - - # Move temporary file to final location - os.replace(temp_path, full_path) - - # Clean up old backup if everything succeeded - if os.path.exists(backup_path): - try: - os.remove(backup_path) - except Exception as e: - logger.warning( - f"Could not remove backup file: {e}" - ) - - # Log saved state information if verbose - if self.verbose: - self._log_saved_state_info(full_path) - - logger.info( - f"Successfully saved agent state to: {full_path}" - ) - - # Handle additional component saves - self._save_additional_components(full_path) - - except OSError as e: - logger.error( - f"Filesystem error while saving agent state: {e}" - ) - raise - except Exception as e: - logger.error(f"Unexpected error saving agent state: {e}") - raise - - def _save_additional_components(self, base_path: str) -> None: - """Save additional agent components like memory.""" - try: - # Save long term memory if it exists - if ( - hasattr(self, "long_term_memory") - and self.long_term_memory is not None - ): - memory_path = ( - f"{os.path.splitext(base_path)[0]}_memory.json" + if not os.path.exists(file_path): + raise FileNotFoundError( + f"File not found at path: {file_path}" ) - try: - self.long_term_memory.save(memory_path) - logger.info( - f"Saved long-term memory to: {memory_path}" - ) - except Exception as e: - logger.warning( - f"Could not save long-term memory: {e}" - ) - # Save memory manager if it exists - if ( - hasattr(self, "memory_manager") - and self.memory_manager is not None - ): - manager_path = f"{os.path.splitext(base_path)[0]}_memory_manager.json" + with open(file_path, "r") as file: try: - self.memory_manager.save_memory_snapshot( - manager_path - ) - logger.info( - f"Saved memory manager state to: {manager_path}" - ) - except Exception as e: - logger.warning( - f"Could not save memory manager: {e}" + data = json.load(file) + except json.JSONDecodeError as e: + logger.error( + f"Invalid JSON in file {file_path}: {str(e)}" ) + raise - except Exception as e: - logger.warning(f"Error saving additional components: {e}") - - def enable_autosave(self, interval: int = 300) -> None: - """ - Enable automatic saving of agent state using SafeStateManager at specified intervals. - - Args: - interval (int): Time between saves in seconds. Defaults to 300 (5 minutes). - """ - - def autosave_loop(): - while self.autosave: - try: - self.save() - if self.verbose: - logger.debug( - f"Autosaved agent state (interval: {interval}s)" - ) - except Exception as e: - logger.error(f"Autosave failed: {e}") - time.sleep(interval) - - self.autosave = True - self.autosave_thread = threading.Thread( - target=autosave_loop, - daemon=True, - name=f"{self.agent_name}_autosave", - ) - self.autosave_thread.start() - logger.info(f"Enabled autosave with {interval}s interval") - - def disable_autosave(self) -> None: - """Disable automatic saving of agent state.""" - if hasattr(self, "autosave"): - self.autosave = False - if hasattr(self, "autosave_thread"): - self.autosave_thread.join(timeout=1) - delattr(self, "autosave_thread") - logger.info("Disabled autosave") - - def cleanup(self) -> None: - """Cleanup method to be called on exit. Ensures final state is saved.""" - try: - if getattr(self, "autosave", False): - logger.info( - "Performing final autosave before exit..." - ) - self.disable_autosave() - self.save() - except Exception as e: - logger.error(f"Error during cleanup: {e}") - - def load(self, file_path: str = None) -> None: - """ - Load agent state from a file using SafeStateManager. - Automatically preserves class instances and complex objects. - - Args: - file_path (str, optional): Path to load state from. - If None, uses default path from agent config. - - Raises: - FileNotFoundError: If state file doesn't exist - Exception: If there's an error during loading - """ - try: - # Resolve load path conditionally with a check for self.load_state_path - resolved_path = ( - file_path - or self.load_state_path - or ( - f"{self.saved_state_path}.json" - if self.saved_state_path - else ( - f"{self.agent_name}.json" - if self.agent_name - else ( - f"{self.workspace_dir}/{self.agent_name}_state.json" - if self.workspace_dir and self.agent_name - else None - ) - ) + if not isinstance(data, dict): + raise ValueError( + f"Expected dict data but got {type(data)}" ) - ) - - # Load state using SafeStateManager - SafeStateManager.load_state(self, resolved_path) - # Reinitialize any necessary runtime components - self._reinitialize_after_load() + # Store current LLM + current_llm = self.llm - if self.verbose: - self._log_loaded_state_info(resolved_path) - - except FileNotFoundError: - logger.error(f"State file not found: {resolved_path}") - raise - except Exception as e: - logger.error(f"Error loading agent state: {e}") - raise - - def _reinitialize_after_load(self) -> None: - """ - Reinitialize necessary components after loading state. - Called automatically after load() to ensure all components are properly set up. - """ - try: - # Reinitialize conversation if needed - if ( - not hasattr(self, "short_memory") - or self.short_memory is None - ): - self.short_memory = Conversation( - system_prompt=self.system_prompt, - time_enabled=True, - user=self.user_name, - rules=self.rules, - ) - - # Reinitialize executor if needed - if not hasattr(self, "executor") or self.executor is None: - self.executor = ThreadPoolExecutor( - max_workers=os.cpu_count() + try: + for key, value in data.items(): + if key != "llm": + setattr(self, key, value) + except AttributeError as e: + logger.error( + f"Error setting agent attribute: {str(e)}" ) + raise - # # Reinitialize tool structure if needed - # if hasattr(self, 'tools') and (self.tools or getattr(self, 'list_base_models', None)): - # self.tool_struct = BaseTool( - # tools=self.tools, - # base_models=getattr(self, 'list_base_models', None), - # tool_system_prompt=self.tool_system_prompt - # ) - - except Exception as e: - logger.error(f"Error reinitializing components: {e}") - raise + # Restore LLM + self.llm = current_llm - def _log_saved_state_info(self, file_path: str) -> None: - """Log information about saved state for debugging""" - try: - state_dict = SafeLoaderUtils.create_state_dict(self) - preserved = SafeLoaderUtils.preserve_instances(self) - - logger.info(f"Saved agent state to: {file_path}") - logger.debug( - f"Saved {len(state_dict)} configuration values" - ) - logger.debug( - f"Preserved {len(preserved)} class instances" + logger.info( + f"Successfully loaded agent history from: {file_path}" ) - if self.verbose: - logger.debug("Preserved instances:") - for name, instance in preserved.items(): - logger.debug( - f" - {name}: {type(instance).__name__}" - ) except Exception as e: - logger.error(f"Error logging state info: {e}") - - def _log_loaded_state_info(self, file_path: str) -> None: - """Log information about loaded state for debugging""" - try: - state_dict = SafeLoaderUtils.create_state_dict(self) - preserved = SafeLoaderUtils.preserve_instances(self) - - logger.info(f"Loaded agent state from: {file_path}") - logger.debug( - f"Loaded {len(state_dict)} configuration values" - ) - logger.debug( - f"Preserved {len(preserved)} class instances" + logger.error( + f"Unexpected error loading agent history: {str(e)}" ) + raise - if self.verbose: - logger.debug("Current class instances:") - for name, instance in preserved.items(): - logger.debug( - f" - {name}: {type(instance).__name__}" - ) - except Exception as e: - logger.error(f"Error logging state info: {e}") - - def get_saveable_state(self) -> Dict[str, Any]: - """ - Get a dictionary of all saveable state values. - Useful for debugging or manual state inspection. - - Returns: - Dict[str, Any]: Dictionary of saveable values - """ - return SafeLoaderUtils.create_state_dict(self) - - def get_preserved_instances(self) -> Dict[str, Any]: - """ - Get a dictionary of all preserved class instances. - Useful for debugging or manual state inspection. - - Returns: - Dict[str, Any]: Dictionary of preserved instances - """ - return SafeLoaderUtils.preserve_instances(self) + return None def graceful_shutdown(self): """Gracefully shutdown the system saving the state""" @@ -1758,6 +1470,24 @@ class Agent: def get_llm_parameters(self): return str(vars(self.llm)) + def save_state(self, *args, **kwargs) -> None: + """ + Saves the current state of the agent to a JSON file, including the llm parameters. + + Args: + file_path (str): The path to the JSON file where the state will be saved. + + Example: + >>> agent.save_state('saved_flow.json') + """ + try: + logger.info(f"Saving Agent {self.agent_name}") + self.save() + logger.info("Saved agent state") + except Exception as error: + logger.error(f"Error saving agent state: {error}") + raise error + def update_system_prompt(self, system_prompt: str): """Upddate the system message""" self.system_prompt = system_prompt @@ -1992,6 +1722,53 @@ class Agent: except Exception as e: print(f"Error occurred during sentiment analysis: {e}") + def count_and_shorten_context_window( + self, history: str, *args, **kwargs + ): + """ + Count the number of tokens in the context window and shorten it if it exceeds the limit. + + Args: + history (str): The history of the conversation. + + Returns: + str: The shortened context window. + """ + # Count the number of tokens in the context window + count = self.tokenizer.count_tokens(history) + + # Shorten the context window if it exceeds the limit, keeping the last n tokens, need to implement the indexing + if count > self.context_length: + history = history[-self.context_length :] + + return history + + def output_cleaner_and_output_type( + self, response: str, *args, **kwargs + ): + """ + Applies the output cleaner function to the response and prepares the output for the output model. + + Args: + response (str): The response to be processed. + + Returns: + str: The processed response. + """ + # Apply the cleaner function to the response + if self.output_cleaner is not None: + logger.info("Applying output cleaner to response.") + response = self.output_cleaner(response) + logger.info(f"Response after output cleaner: {response}") + + # Prepare the output for the output model + if self.output_type is not None: + # logger.info("Preparing output for output model.") + response = prepare_output_for_output_model(response) + print(f"Response after output model: {response}") + + return response + def stream_response( self, response: str, delay: float = 0.001 ) -> None: @@ -2023,6 +1800,37 @@ class Agent: except Exception as e: print(f"An error occurred during streaming: {e}") + def dynamic_context_window(self): + """ + dynamic_context_window essentially clears everything execep + the system prompt and leaves the rest of the contxt window + for RAG query tokens + + """ + # Count the number of tokens in the short term memory + logger.info("Dynamic context window shuffling enabled") + count = self.tokenizer.count_tokens( + self.short_memory.return_history_as_string() + ) + logger.info(f"Number of tokens in memory: {count}") + + # Dynamically allocating everything except the system prompt to be dynamic + # We need to query the short_memory dict, for the system prompt slot + # Then delete everything after that + + if count > self.context_length: + self.short_memory = self.short_memory[ + -self.context_length : + ] + logger.info( + f"Short term memory has been truncated to {self.context_length} tokens" + ) + else: + logger.info("Short term memory is within the limit") + + # Return the memory as a string or update the short term memory + # return memory + def check_available_tokens(self): # Log the amount of tokens left in the memory and in the task if self.tokenizer is not None: @@ -2048,6 +1856,58 @@ class Agent: return out + def truncate_string_by_tokens( + self, input_string: str, limit: int + ) -> str: + """ + Truncate a string if it exceeds a specified number of tokens using a given tokenizer. + + :param input_string: The input string to be tokenized and truncated. + :param tokenizer: The tokenizer function to be used for tokenizing the input string. + :param max_tokens: The maximum number of tokens allowed. + :return: The truncated string if it exceeds the maximum number of tokens; otherwise, the original string. + """ + # Tokenize the input string + tokens = self.tokenizer.count_tokens(input_string) + + # Check if the number of tokens exceeds the maximum limit + if len(tokens) > limit: + # Truncate the tokens to the maximum allowed tokens + truncated_tokens = tokens[: self.context_length] + # Join the truncated tokens back to a string + truncated_string = " ".join(truncated_tokens) + return truncated_string + else: + return input_string + + def tokens_operations(self, input_string: str) -> str: + """ + Perform various operations on tokens of an input string. + + :param input_string: The input string to be processed. + :return: The processed string. + """ + # Tokenize the input string + tokens = self.tokenizer.count_tokens(input_string) + + # Check if the number of tokens exceeds the maximum limit + if len(tokens) > self.context_length: + # Truncate the tokens to the maximum allowed tokens + truncated_tokens = tokens[: self.context_length] + # Join the truncated tokens back to a string + truncated_string = " ".join(truncated_tokens) + return truncated_string + else: + # Log the amount of tokens left in the memory and in the task + if self.tokenizer is not None: + tokens_used = self.tokenizer.count_tokens( + self.short_memory.return_history_as_string() + ) + logger.info( + f"Tokens available: {tokens_used - self.context_length}" + ) + return input_string + def parse_function_call_and_execute(self, response: str): """ Parses a function call from the given response and executes it. @@ -2406,31 +2266,22 @@ class Agent: **kwargs: Arbitrary keyword arguments. Returns: - str: The result of the method call on the `llm` object. - - Raises: - AttributeError: If no suitable method is found in the llm object. - TypeError: If task is not a string or llm object is None. - ValueError: If task is empty. - """ - if not isinstance(task, str): - raise TypeError("Task must be a string") - - if not task.strip(): - raise ValueError("Task cannot be empty") - - if self.llm is None: - raise TypeError("LLM object cannot be None") - - try: - out = self.llm.run(task, *args, **kwargs) - - return out - except AttributeError as e: - logger.error( - f"Error calling LLM: {e} You need a class with a run(task: str) method" + The result of the method call on the `llm` object. + + """ + # Check if the llm has a __call__, or run, or any other method + if hasattr(self.llm, "__call__"): + return self.llm(task, *args, **kwargs) + elif hasattr(self.llm, "run"): + return self.llm.run(task, *args, **kwargs) + elif hasattr(self.llm, "generate"): + return self.llm.generate(task, *args, **kwargs) + elif hasattr(self.llm, "invoke"): + return self.llm.invoke(task, *args, **kwargs) + else: + raise AttributeError( + "No suitable method found in the llm object." ) - raise e def handle_sop_ops(self): # If the user inputs a list of strings for the sop then join them and set the sop @@ -2455,8 +2306,9 @@ class Agent: device_id: Optional[int] = 0, all_cores: Optional[bool] = True, scheduled_run_date: Optional[datetime] = None, - do_not_use_cluster_ops: Optional[bool] = True, + do_not_use_cluster_ops: Optional[bool] = False, all_gpus: Optional[bool] = False, + generate_speech: Optional[bool] = False, *args, **kwargs, ) -> Any: @@ -2489,7 +2341,6 @@ class Agent: device_id = device_id or self.device_id all_cores = all_cores or self.all_cores all_gpus = all_gpus or self.all_gpus - do_not_use_cluster_ops = ( do_not_use_cluster_ops or self.do_not_use_cluster_ops ) @@ -2507,7 +2358,7 @@ class Agent: return self._run( task=task, img=img, - *args, + generate_speech=generate_speech * args, **kwargs, ) @@ -2520,15 +2371,17 @@ class Agent: func=self._run, task=task, img=img, + generate_speech=generate_speech, *args, **kwargs, ) except ValueError as e: - self._handle_run_error(e) - + logger.error(f"Invalid device specified: {e}") + raise e except Exception as e: - self._handle_run_error(e) + logger.error(f"An error occurred during execution: {e}") + raise e def handle_artifacts( self, text: str, file_output_path: str, file_extension: str @@ -2536,8 +2389,8 @@ class Agent: """Handle creating and saving artifacts with error handling.""" try: # Ensure file_extension starts with a dot - if not file_extension.startswith("."): - file_extension = "." + file_extension + if not file_extension.startswith('.'): + file_extension = '.' + file_extension # If file_output_path doesn't have an extension, treat it as a directory # and create a default filename based on timestamp @@ -2559,26 +2412,18 @@ class Agent: edit_count=0, ) - logger.info( - f"Saving artifact with extension: {file_extension}" - ) + logger.info(f"Saving artifact with extension: {file_extension}") artifact.save_as(file_extension) - logger.success( - f"Successfully saved artifact to {full_path}" - ) + logger.success(f"Successfully saved artifact to {full_path}") except ValueError as e: - logger.error( - f"Invalid input values for artifact: {str(e)}" - ) + logger.error(f"Invalid input values for artifact: {str(e)}") raise except IOError as e: logger.error(f"Error saving artifact to file: {str(e)}") raise except Exception as e: - logger.error( - f"Unexpected error handling artifact: {str(e)}" - ) + logger.error(f"Unexpected error handling artifact: {str(e)}") raise def showcase_config(self): diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index 8d7b6ff8..41f546c0 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -17,7 +17,6 @@ from swarms.utils.loguru_logger import initialize_logger from swarms.utils.wrapper_clusterop import ( exec_callable_with_clusterops, ) -from swarms.structs.output_types import OutputType logger = initialize_logger(log_folder="rearrange") diff --git a/swarms/telemetry/bootup.py b/swarms/telemetry/bootup.py index 87dc1c77..5e38c3ea 100644 --- a/swarms/telemetry/bootup.py +++ b/swarms/telemetry/bootup.py @@ -1,65 +1,27 @@ import os import logging import warnings -import concurrent.futures -from dotenv import load_dotenv -from loguru import logger +from concurrent.futures import ThreadPoolExecutor from swarms.utils.disable_logging import disable_logging def bootup(): - """Initialize swarms environment and configuration - - Handles environment setup, logging configuration, telemetry, - and workspace initialization. - """ + """Bootup swarms""" try: - # Load environment variables - load_dotenv() - - # Configure logging - if ( - os.getenv("SWARMS_VERBOSE_GLOBAL", "False").lower() - == "false" - ): - logger.disable("") - logging.disable(logging.CRITICAL) - - # Silent wandb + logging.disable(logging.CRITICAL) os.environ["WANDB_SILENT"] = "true" - # Configure workspace + # Auto set workspace directory workspace_dir = os.path.join(os.getcwd(), "agent_workspace") - os.makedirs(workspace_dir, exist_ok=True) + if not os.path.exists(workspace_dir): + os.makedirs(workspace_dir, exist_ok=True) os.environ["WORKSPACE_DIR"] = workspace_dir - # Suppress warnings warnings.filterwarnings("ignore", category=DeprecationWarning) - # Run telemetry functions concurrently - try: - with concurrent.futures.ThreadPoolExecutor( - max_workers=2 - ) as executor: - from swarms.telemetry.sentry_active import ( - activate_sentry, - ) - - future_disable_logging = executor.submit( - disable_logging - ) - future_sentry = executor.submit(activate_sentry) - - # Wait for completion and check for exceptions - future_disable_logging.result() - future_sentry.result() - except Exception as e: - logger.error(f"Error running telemetry functions: {e}") - + # Use ThreadPoolExecutor to run disable_logging and auto_update concurrently + with ThreadPoolExecutor(max_workers=1) as executor: + executor.submit(disable_logging) except Exception as e: - logger.error(f"Error during bootup: {str(e)}") + print(f"An error occurred: {str(e)}") raise - - -# Run bootup -bootup() diff --git a/swarms/telemetry/capture_sys_data.py b/swarms/telemetry/capture_sys_data.py index 4a09099b..9ef52976 100644 --- a/swarms/telemetry/capture_sys_data.py +++ b/swarms/telemetry/capture_sys_data.py @@ -39,8 +39,7 @@ def capture_system_data() -> Dict[str, str]: system_data["external_ip"] = requests.get( "https://api.ipify.org" ).text - except Exception as e: - logger.warning("Failed to retrieve external IP: {}", e) + except Exception: system_data["external_ip"] = "N/A" return system_data @@ -49,9 +48,7 @@ def capture_system_data() -> Dict[str, str]: return {} -def log_agent_data( - data_dict: dict -) -> dict | None: +def log_agent_data(data_dict: dict) -> dict | None: """ Logs agent data to the Swarms database with retry logic. @@ -76,25 +73,7 @@ def log_agent_data( "Authorization": "Bearer sk-f24a13ed139f757d99cdd9cdcae710fccead92681606a97086d9711f69d44869", } - try: - response = requests.post( - url, json=data_dict, headers=headers, timeout=10 - ) - response.raise_for_status() - - result = response.json() - return result - - except requests.exceptions.Timeout: - logger.warning("Request timed out") - - except requests.exceptions.HTTPError as e: - logger.error(f"HTTP error occurred: {e}") - if response.status_code == 401: - logger.error("Authentication failed - check API key") - - except requests.exceptions.RequestException as e: - logger.error(f"Error logging agent data: {e}") + requests.post(url, json=data_dict, headers=headers, timeout=10) + # response.raise_for_status() - logger.error("Failed to log agent data") return None diff --git a/swarms/utils/loguru_logger.py b/swarms/utils/loguru_logger.py index 0f0524b5..9c71276d 100644 --- a/swarms/utils/loguru_logger.py +++ b/swarms/utils/loguru_logger.py @@ -5,6 +5,7 @@ from loguru import logger import requests from swarms.telemetry.sys_info import system_info + def log_agent_data(data: Any) -> Dict: """ Send data to the agent logging API endpoint. @@ -36,6 +37,7 @@ def log_agent_data(data: Any) -> Dict: logger.error(f"Failed to log agent data: {e}") return {"error": str(e)} + def initialize_logger(log_folder: str = "logs"): """ Initialize and configure the Loguru logger. @@ -87,8 +89,9 @@ def initialize_logger(log_folder: str = "logs"): "metadata": system_info(), } response = log_agent_data(payload) - logger.debug(f"Sent to API: {payload}, Response: {response}") - + logger.debug( + f"Sent to API: {payload}, Response: {response}" + ) logger.add(AgentLogHandler(), level="INFO")