diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 8f1454b0..4f1e4bb3 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -227,6 +227,11 @@ nav: - Meme Agent Builder: "swarms/examples/meme_agents.md" - Multi-Agent Collaboration: - Swarms DAO: "swarms/examples/swarms_dao.md" + + - Swarms UI: + - Overview: "swarms/ui/main.md" + + - Contributors: - Bounty Program: "corporate/bounty_program.md" - Contributing: diff --git a/docs/requirements.txt b/docs/requirements.txt index 1da89301..6ea713d2 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -30,7 +30,6 @@ pygments~=2.19 pymdown-extensions~=10.14 # Requirements for plugins -babel~=2.16 colorama~=0.4 paginate~=0.5 regex>=2022.4 \ No newline at end of file diff --git a/docs/swarms/ui/main.md b/docs/swarms/ui/main.md new file mode 100644 index 00000000..582c09b6 --- /dev/null +++ b/docs/swarms/ui/main.md @@ -0,0 +1,281 @@ +Below is a revised version of the Swarms Chat UI documentation. It improves the flow, adds detailed examples for every use case, and clarifies the installation and usage instructions for a more reliable experience. + +--- + +# Swarms Chat UI Documentation + +The Swarms Chat interface provides a customizable, multi-agent chat experience using Gradio. It supports various specialized AI agents—from finance to healthcare and news analysis—by leveraging Swarms models. + +--- + +## Table of Contents + +1. [Installation](#installation) +2. [Quick Start](#quick-start) +3. [Parameters Overview](#parameters-overview) +4. [Specialized Agents](#specialized-agents) + - [Finance Agents](#finance-agents) + - [Healthcare Agents](#healthcare-agents) + - [News & Research Agents](#news--research-agents) +5. [Swarms Integration Features](#swarms-integration-features) +6. [Usage Examples](#usage-examples) + - [Finance Agent Example](#finance-agent-example) + - [Healthcare Agent Example](#healthcare-agent-example) + - [News Analysis Agent Example](#news-analysis-agent-example) +7. [Setup and Deployment](#setup-and-deployment) +8. [Best Practices](#best-practices) +9. [Notes](#notes) + +--- + +## Installation + +Make sure you have Python 3.7+ installed, then install the required packages using pip: + +```bash +pip install gradio ai-gradio swarms +``` + +--- + +## Quick Start + +Below is a minimal example to get the Swarms Chat interface up and running. Customize the agent, title, and description as needed. + +```python +import gradio as gr +import ai_gradio + +# Create and launch a Swarms Chat interface +gr.load( + name='swarms:gpt-4-turbo', # Model identifier (supports OpenAI and others) + src=ai_gradio.registry, # Source module for model configurations + agent_name="Stock-Analysis-Agent", # Example agent from Finance category + title='Swarms Chat', + description='Chat with an AI agent powered by Swarms' +).launch() +``` + +--- + +## Parameters Overview + +When configuring your interface, consider the following parameters: + +- **`name` (str):** + Model identifier (e.g., `'swarms:gpt-4-turbo'`) that specifies which Swarms model to use. + +- **`src` (module):** + The source module (typically `ai_gradio.registry`) that contains model configurations. + +- **`agent_name` (str):** + The name of the specialized agent you wish to use (e.g., "Stock-Analysis-Agent"). + +- **`title` (str):** + The title that appears at the top of the web interface. + +- **`description` (str):** + A short summary describing the functionality of the chat interface. + +--- + +## Specialized Agents + +Swarms Chat supports multiple specialized agents designed for different domains. Below is an overview of available agent types. + +### Finance Agents + +1. **Stock Analysis Agent** + - **Capabilities:** + - Market analysis and stock recommendations. + - Both technical and fundamental analysis. + - Portfolio management suggestions. + +2. **Tax Planning Agent** + - **Capabilities:** + - Tax optimization strategies. + - Deduction analysis. + - Guidance on tax law compliance. + +### Healthcare Agents + +1. **Medical Diagnosis Assistant** + - **Capabilities:** + - Analysis of symptoms. + - Treatment recommendations. + - Research using current medical literature. + +2. **Healthcare Management Agent** + - **Capabilities:** + - Patient care coordination. + - Organization of medical records. + - Monitoring and tracking treatment plans. + +### News & Research Agents + +1. **News Analysis Agent** + - **Capabilities:** + - Real-time news aggregation. + - Filtering news by topics. + - Trend analysis and insights. + +2. **Research Assistant** + - **Capabilities:** + - Analysis of academic papers. + - Literature review support. + - Guidance on research methodologies. + +--- + +## Swarms Integration Features + +### Core Capabilities + +- **Multi-Agent Collaboration:** Multiple agents can be engaged simultaneously for a coordinated experience. +- **Real-Time Data Processing:** The interface processes and responds to queries in real time. +- **Natural Language Understanding:** Advanced NLP for context-aware and coherent responses. +- **Context-Aware Responses:** Responses are tailored based on conversation context. + +### Technical Features + +- **API Integration Support:** Easily connect with external APIs. +- **Custom Model Selection:** Choose the appropriate model for your specific task. +- **Concurrent Processing:** Supports multiple sessions concurrently. +- **Session Management:** Built-in session management ensures smooth user interactions. + +--- + +## Usage Examples + +Below are detailed examples for each type of specialized agent. + +### Finance Agent Example + +This example configures a chat interface for stock analysis: + +```python +import gradio as gr +import ai_gradio + +finance_interface = gr.load( + name='swarms:gpt-4-turbo', + src=ai_gradio.registry, + agent_name="Stock-Analysis-Agent", + title='Finance Assistant', + description='Expert financial analysis and advice tailored to your investment needs.' +) +finance_interface.launch() +``` + +### Healthcare Agent Example + +This example sets up a chat interface for healthcare assistance: + +```python +import gradio as gr +import ai_gradio + +healthcare_interface = gr.load( + name='swarms:gpt-4-turbo', + src=ai_gradio.registry, + agent_name="Medical-Assistant-Agent", + title='Healthcare Assistant', + description='Access medical information, symptom analysis, and treatment recommendations.' +) +healthcare_interface.launch() +``` + +### News Analysis Agent Example + +This example creates an interface for real-time news analysis: + +```python +import gradio as gr +import ai_gradio + +news_interface = gr.load( + name='swarms:gpt-4-turbo', + src=ai_gradio.registry, + agent_name="News-Analysis-Agent", + title='News Analyzer', + description='Get real-time insights and analysis of trending news topics.' +) +news_interface.launch() +``` + +--- + +## Setup and Deployment + +1. **Install Dependencies:** + Make sure all required packages are installed. + + ```bash + pip install gradio ai-gradio swarms + ``` + +2. **Import Modules:** + Import Gradio and ai_gradio in your Python script. + + ```python + import gradio as gr + import ai_gradio + ``` + +3. **Configure and Launch the Interface:** + Configure your interface with the desired parameters and then launch. + + ```python + interface = gr.load( + name='swarms:gpt-4-turbo', + src=ai_gradio.registry, + agent_name="Your-Desired-Agent", + title='Your Interface Title', + description='A brief description of your interface.' + ) + interface.launch() + ``` + +4. **Deployment Options:** + - **Local:** By default, the interface runs at [http://localhost:7860](http://localhost:7860). + - **Cloud Deployment:** Use cloud platforms like Heroku, AWS, or Google Cloud for remote access. + - **Concurrent Sessions:** The system supports multiple users at the same time. Monitor resources and use proper scaling. + +--- + +## Best Practices + +1. **Select the Right Agent:** + Use the agent that best suits your specific domain needs. + +2. **Model Configuration:** + Adjust model parameters based on your computational resources to balance performance and cost. + +3. **Error Handling:** + Implement error handling to manage unexpected inputs or API failures gracefully. + +4. **Resource Monitoring:** + Keep an eye on system performance, especially during high-concurrency sessions. + +5. **Regular Updates:** + Keep your Swarms and Gradio packages updated to ensure compatibility with new features and security patches. + +--- + +## Notes + +- **Local vs. Remote:** + The interface runs locally by default but can be deployed on remote servers for wider accessibility. + +- **Customization:** + You can configure custom model parameters and integrate additional APIs as needed. + +- **Session Management:** + Built-in session handling ensures that users can interact concurrently without interfering with each other's sessions. + +- **Error Handling & Rate Limiting:** + The system includes basic error handling and rate limiting to maintain performance under load. + +--- + +This documentation is designed to provide clarity, reliability, and comprehensive guidance for integrating and using the Swarms Chat UI. For further customization or troubleshooting, consult the respective package documentation and community forums. \ No newline at end of file diff --git a/examples/hackathon_feb16/fraud.py b/examples/hackathon_feb16/fraud.py new file mode 100644 index 00000000..189bea86 --- /dev/null +++ b/examples/hackathon_feb16/fraud.py @@ -0,0 +1,265 @@ +import csv +import json +from swarms import Agent + + +############################################################################### +# FraudClassifier Class Definition +############################################################################### +class FraudClassifier: + def __init__(self, model_name="gpt-4o-mini"): + """ + Initialize the system prompts and all agent instances. + """ + + # ------------------ Boss Agent Prompt ------------------ # + self.BOSS_AGENT_SYS_PROMPT = """ + You are the Boss Agent. Your role is to orchestrate fraud analysis. + + First, you will receive a full CSV row as a string. Your task is to parse the row + into its component fields: declared_country, ip_country, phone_carrier_country, ip_address, + known_blacklisted_regions, user_name, email, account_name, payment_info_name, account_history_notes. + + Then, you should instruct which specialized agents to call: + - For location data (declared_country, ip_country, phone_carrier_country): Geolocation Agent. + - For IP data (ip_address, known_blacklisted_regions): IP Agent. + - For account data (user_name, email, account_name, payment_info_name, account_history_notes): Email Agent. + + Respond with your instructions in JSON format like: + { + "geolocation_data": "Concise location info", + "ip_data": "Concise ip info", + "email_data": "Concise account info" + } + + After you receive the specialized agents’ responses, you will be given the full row + again along with the sub-agent results. Then produce a final JSON in the following format: + { + "final_suspicious": bool, + "details": [ + { "agent_name": "GeolocationAgent", "is_suspicious": bool, "reason": "..." }, + { "agent_name": "IPAgent", "is_suspicious": bool, "reason": "..." }, + { "agent_name": "EmailAgent", "is_suspicious": bool, "reason": "..." } + ], + "overall_reason": "Short summary" + } + """ + + # ------------------ Specialized Agent Prompts ------------------ # + self.GEOLOCATION_AGENT_SYS_PROMPT = """ + You are the Geolocation Agent. + Your input is location-related data (declared_country, ip_country, phone_carrier_country). + Decide if there is a suspicious mismatch. + Return a JSON in the following format: + { + "is_suspicious": true or false, + "reason": "Short reason" + } + """ + + self.IP_AGENT_SYS_PROMPT = """ + You are the IP Agent. + Your input includes ip_address and known_blacklisted_regions. + Decide if the IP is suspicious (e.g., blacklisted or unusual). + Return a JSON in the following format: + { + "is_suspicious": true or false, + "reason": "Short reason" + } + """ + + self.EMAIL_AGENT_SYS_PROMPT = """ + You are the Email/Account Agent. + Your input includes user_name, email, account_name, payment_info_name, and account_history_notes. + Decide if the account data is suspicious (e.g., name mismatches or new account flags). + Return a JSON in the following format: + { + "is_suspicious": true or false, + "reason": "Short reason" + } + """ + + # ------------------ Initialize Agents ------------------ # + self.boss_agent = Agent( + agent_name="Boss-Agent", + system_prompt=self.BOSS_AGENT_SYS_PROMPT, + model_name=model_name, + max_loops=2, + autosave=False, + dashboard=False, + verbose=False, + dynamic_temperature_enabled=False, + saved_state_path="boss_agent.json", + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + return_step_meta=False, + output_type="json", # Expect JSON output + streaming_on=False, + ) + + self.geolocation_agent = Agent( + agent_name="Geolocation-Agent", + system_prompt=self.GEOLOCATION_AGENT_SYS_PROMPT, + model_name=model_name, + max_loops=1, + autosave=False, + dashboard=False, + verbose=False, + dynamic_temperature_enabled=False, + saved_state_path="geolocation_agent.json", + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + return_step_meta=False, + output_type="json", + streaming_on=False, + ) + + self.ip_agent = Agent( + agent_name="IP-Agent", + system_prompt=self.IP_AGENT_SYS_PROMPT, + model_name=model_name, + max_loops=1, + autosave=False, + dashboard=False, + verbose=False, + dynamic_temperature_enabled=False, + saved_state_path="ip_agent.json", + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + return_step_meta=False, + output_type="json", + streaming_on=False, + ) + + self.email_agent = Agent( + agent_name="Email-Agent", + system_prompt=self.EMAIL_AGENT_SYS_PROMPT, + model_name=model_name, + max_loops=1, + autosave=False, + dashboard=False, + verbose=False, + dynamic_temperature_enabled=False, + saved_state_path="email_agent.json", + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + return_step_meta=False, + output_type="json", + streaming_on=False, + ) + + def classify_row(self, row: dict) -> dict: + """ + For a given CSV row (as a dict): + 1. Concatenate the entire row into a single string. + 2. Send that string to the Boss Agent to instruct how to parse and dispatch to sub‑agents. + 3. Call the specialized agents with the appropriate data. + 4. Send the sub‑agent results back to the Boss Agent for a final decision. + """ + # (a) Concatenate entire row into one string. + row_string = " ".join(f"{k}: {v}" for k, v in row.items()) + + # (b) Send row to Boss Agent for parsing/instructions. + initial_prompt = f""" + Here is a CSV row data: + {row_string} + + Please parse the row into its fields: + declared_country, ip_country, phone_carrier_country, ip_address, known_blacklisted_regions, user_name, email, account_name, payment_info_name, account_history_notes. + Then, provide your instructions (in JSON) on what to send to the sub-agents. + For example: + {{ + "geolocation_data": "declared_country, ip_country, phone_carrier_country", + "ip_data": "ip_address, known_blacklisted_regions", + "email_data": "user_name, email, account_name, payment_info_name, account_history_notes" + }} + """ + boss_instructions_raw = self.boss_agent.run(initial_prompt) + try: + boss_instructions = json.loads(boss_instructions_raw) + except Exception: + # If parsing fails, we fall back to manually constructing the sub-agent inputs. + boss_instructions = { + "geolocation_data": f"declared_country: {row.get('declared_country','')}, ip_country: {row.get('ip_country','')}, phone_carrier_country: {row.get('phone_carrier_country','')}", + "ip_data": f"ip_address: {row.get('ip_address','')}, known_blacklisted_regions: {row.get('known_blacklisted_regions','')}", + "email_data": f"user_name: {row.get('user_name','')}, email: {row.get('email','')}, account_name: {row.get('account_name','')}, payment_info_name: {row.get('payment_info_name','')}, account_history_notes: {row.get('account_history_notes','')}", + } + + # (c) Call specialized agents using either the Boss Agent's instructions or defaults. + geo_result = self.geolocation_agent.run( + boss_instructions.get("geolocation_data", "") + ) + ip_result = self.ip_agent.run( + boss_instructions.get("ip_data", "") + ) + email_result = self.email_agent.run( + boss_instructions.get("email_data", "") + ) + + # (d) Consolidate specialized agent results as JSON. + specialized_results = { + "GeolocationAgent": geo_result, + "IPAgent": ip_result, + "EmailAgent": email_result, + } + specialized_results_json = json.dumps(specialized_results) + + # (e) Send the original row data and the specialized results back to the Boss Agent. + final_prompt = f""" + Here is the original CSV row data: + {row_string} + + Here are the results from the specialized agents: + {specialized_results_json} + + Based on this information, produce the final fraud classification JSON in the format: + {{ + "final_suspicious": bool, + "details": [ + {{ "agent_name": "GeolocationAgent", "is_suspicious": bool, "reason": "..." }}, + {{ "agent_name": "IPAgent", "is_suspicious": bool, "reason": "..." }}, + {{ "agent_name": "EmailAgent", "is_suspicious": bool, "reason": "..." }} + ], + "overall_reason": "Short summary" + }} + """ + final_result_raw = self.boss_agent.run(final_prompt) + try: + final_result = json.loads(final_result_raw) + except Exception: + final_result = {"final_result_raw": final_result_raw} + return final_result + + def classify_csv(self, csv_file_path: str): + """ + Load a CSV file, iterate over each row, run the classification, and return the results. + """ + results = [] + with open(csv_file_path, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + result = self.classify_row(row) + results.append(result) + return results + + +############################################################################### +# Example Usage +############################################################################### +if __name__ == "__main__": + # Create an instance of the FraudClassifier. + classifier = FraudClassifier() + + # Specify your CSV file (e.g., "fraud_data.csv") + csv_file = "fraud_data.csv" + all_reports = classifier.classify_csv(csv_file) + + # Print the final fraud classification for each row. + for idx, report in enumerate(all_reports, start=1): + print( + f"Row {idx} classification:\n{json.dumps(report, indent=2)}\n" + ) diff --git a/examples/hackathon_feb16/gassisan_splat.py b/examples/hackathon_feb16/gassisan_splat.py new file mode 100644 index 00000000..cac8f218 --- /dev/null +++ b/examples/hackathon_feb16/gassisan_splat.py @@ -0,0 +1,311 @@ +import torch +from torch import Tensor +from loguru import logger +from typing import Tuple +import matplotlib.pyplot as plt + +try: + # ipywidgets is available in interactive environments like Jupyter. + from ipywidgets import interact, IntSlider + + HAS_IPYWIDGETS = True +except ImportError: + HAS_IPYWIDGETS = False + logger.warning( + "ipywidgets not installed. Interactive slicing will be disabled." + ) + + +class GaussianSplat4DStateSpace: + """ + 4D Gaussian splatting with a state space model in PyTorch. + + Each Gaussian is defined by an 8D state vector: + [x, y, z, w, vx, vy, vz, vw], + where the first four dimensions are the spatial coordinates and the last + four are the velocities. Only the spatial (first four) dimensions are used + for the 4D Gaussian splat, with a corresponding 4×4 covariance matrix. + + Attributes: + num_gaussians (int): Number of Gaussians. + state_dim (int): Dimension of the state vector (should be 8). + states (Tensor): Current state for each Gaussian of shape (num_gaussians, state_dim). + covariances (Tensor): Covariance matrices for the spatial dimensions, shape (num_gaussians, 4, 4). + A (Tensor): State transition matrix of shape (state_dim, state_dim). + dt (float): Time step for state updates. + """ + + def __init__( + self, + num_gaussians: int, + init_states: Tensor, + init_covariances: Tensor, + dt: float = 1.0, + ) -> None: + """ + Initialize the 4D Gaussian splat model. + + Args: + num_gaussians (int): Number of Gaussians. + init_states (Tensor): Initial states of shape (num_gaussians, 8). + Each state is assumed to be + [x, y, z, w, vx, vy, vz, vw]. + init_covariances (Tensor): Initial covariance matrices for the spatial dimensions, + shape (num_gaussians, 4, 4). + dt (float): Time step for the state update. + """ + if init_states.shape[1] != 8: + raise ValueError( + "init_states should have shape (N, 8) where 8 = 4 position + 4 velocity." + ) + if init_covariances.shape[1:] != (4, 4): + raise ValueError( + "init_covariances should have shape (N, 4, 4)." + ) + + self.num_gaussians = num_gaussians + self.states = init_states.clone() # shape: (N, 8) + self.covariances = ( + init_covariances.clone() + ) # shape: (N, 4, 4) + self.dt = dt + self.state_dim = init_states.shape[1] + + # Create an 8x8 constant-velocity state transition matrix: + # New position = position + velocity*dt, velocity remains unchanged. + I4 = torch.eye( + 4, dtype=init_states.dtype, device=init_states.device + ) + zeros4 = torch.zeros( + (4, 4), dtype=init_states.dtype, device=init_states.device + ) + top = torch.cat([I4, dt * I4], dim=1) + bottom = torch.cat([zeros4, I4], dim=1) + self.A = torch.cat([top, bottom], dim=0) # shape: (8, 8) + + logger.info( + "Initialized 4D GaussianSplatStateSpace with {} Gaussians.", + num_gaussians, + ) + + def update_states(self) -> None: + """ + Update the state of each Gaussian using the constant-velocity state space model. + + Applies: + state_next = A @ state_current. + """ + self.states = ( + self.A @ self.states.t() + ).t() # shape: (num_gaussians, 8) + logger.debug("States updated: {}", self.states) + + def _compute_gaussian( + self, pos: Tensor, cov: Tensor, coords: Tensor + ) -> Tensor: + """ + Compute the 4D Gaussian function over a grid of coordinates. + + Args: + pos (Tensor): The center of the Gaussian (4,). + cov (Tensor): The 4×4 covariance matrix. + coords (Tensor): A grid of coordinates of shape (..., 4). + + Returns: + Tensor: Evaluated Gaussian values on the grid with shape equal to coords.shape[:-1]. + """ + try: + cov_inv = torch.linalg.inv(cov) + except RuntimeError as e: + logger.warning( + "Covariance inversion failed; using pseudo-inverse. Error: {}", + e, + ) + cov_inv = torch.linalg.pinv(cov) + + # Broadcast pos over the grid + diff = coords - pos.view( + *(1 for _ in range(coords.ndim - 1)), 4 + ) + mahal = torch.einsum("...i,ij,...j->...", diff, cov_inv, diff) + gaussian = torch.exp(-0.5 * mahal) + return gaussian + + def render( + self, + canvas_size: Tuple[int, int, int, int], + sigma_scale: float = 1.0, + normalize: bool = False, + ) -> Tensor: + """ + Render the current 4D Gaussian splats onto a 4D canvas. + + Args: + canvas_size (Tuple[int, int, int, int]): The size of the canvas (d1, d2, d3, d4). + sigma_scale (float): Scaling factor for the covariance (affects spread). + normalize (bool): Whether to normalize the final canvas to [0, 1]. + + Returns: + Tensor: A 4D tensor (canvas) with the accumulated contributions from all Gaussians. + """ + d1, d2, d3, d4 = canvas_size + + # Create coordinate grids for each dimension. + grid1 = torch.linspace( + 0, d1 - 1, d1, device=self.states.device + ) + grid2 = torch.linspace( + 0, d2 - 1, d2, device=self.states.device + ) + grid3 = torch.linspace( + 0, d3 - 1, d3, device=self.states.device + ) + grid4 = torch.linspace( + 0, d4 - 1, d4, device=self.states.device + ) + + # Create a 4D meshgrid (using indexing "ij") + grid = torch.stack( + torch.meshgrid(grid1, grid2, grid3, grid4, indexing="ij"), + dim=-1, + ) # shape: (d1, d2, d3, d4, 4) + + # Initialize the canvas. + canvas = torch.zeros( + (d1, d2, d3, d4), + dtype=self.states.dtype, + device=self.states.device, + ) + + for i in range(self.num_gaussians): + pos = self.states[i, :4] # spatial center (4,) + cov = ( + self.covariances[i] * sigma_scale + ) # scaled covariance + gaussian = self._compute_gaussian(pos, cov, grid) + canvas += gaussian + logger.debug( + "Rendered Gaussian {} at position {}", i, pos.tolist() + ) + + if normalize: + max_val = canvas.max() + if max_val > 0: + canvas = canvas / max_val + logger.debug("Canvas normalized.") + + logger.info("4D Rendering complete.") + return canvas + + +def interactive_slice(canvas: Tensor) -> None: + """ + Display an interactive 2D slice of the 4D canvas using ipywidgets. + + This function fixes two of the four dimensions (d3 and d4) via sliders and + displays the resulting 2D slice (over dimensions d1 and d2). + + Args: + canvas (Tensor): A 4D tensor with shape (d1, d2, d3, d4). + """ + d1, d2, d3, d4 = canvas.shape + + def display_slice(slice_d3: int, slice_d4: int): + slice_2d = canvas[:, :, slice_d3, slice_d4].cpu().numpy() + plt.figure(figsize=(6, 6)) + plt.imshow(slice_2d, cmap="hot", origin="lower") + plt.title(f"2D Slice at d3={slice_d3}, d4={slice_d4}") + plt.colorbar() + plt.show() + + interact( + display_slice, + slice_d3=IntSlider(min=0, max=d3 - 1, step=1, value=d3 // 2), + slice_d4=IntSlider(min=0, max=d4 - 1, step=1, value=d4 // 2), + ) + + +def mip_projection(canvas: Tensor) -> None: + """ + Render a 2D view of the 4D canvas using maximum intensity projection (MIP) + along the 3rd and 4th dimensions. + + Args: + canvas (Tensor): A 4D tensor with shape (d1, d2, d3, d4). + """ + # MIP along dimension 3 + mip_3d = canvas.max(dim=2)[0] # shape: (d1, d2, d4) + # MIP along dimension 4 + mip_2d = mip_3d.max(dim=2)[0] # shape: (d1, d2) + + plt.figure(figsize=(6, 6)) + plt.imshow(mip_2d.cpu().numpy(), cmap="hot", origin="lower") + plt.title("2D MIP (Projecting dimensions d3 and d4)") + plt.colorbar() + plt.show() + + +def main() -> None: + """ + Main function that: + - Creates a 4D Gaussian splat model. + - Updates the states to simulate motion. + - Renders a 4D canvas. + - Visualizes the 4D volume via interactive slicing (if available) or MIP. + """ + torch.manual_seed(42) + num_gaussians = 2 + + # Define initial states for each Gaussian: + # Each state is [x, y, z, w, vx, vy, vz, vw]. + init_states = torch.tensor( + [ + [10.0, 15.0, 20.0, 25.0, 0.5, -0.2, 0.3, 0.1], + [30.0, 35.0, 40.0, 45.0, -0.3, 0.4, -0.1, 0.2], + ], + dtype=torch.float32, + ) + + # Define initial 4x4 covariance matrices for the spatial dimensions. + init_covariances = torch.stack( + [ + torch.diag( + torch.tensor( + [5.0, 5.0, 5.0, 5.0], dtype=torch.float32 + ) + ), + torch.diag( + torch.tensor( + [3.0, 3.0, 3.0, 3.0], dtype=torch.float32 + ) + ), + ] + ) + + # Create the 4D Gaussian splat model. + model = GaussianSplat4DStateSpace( + num_gaussians, init_states, init_covariances, dt=1.0 + ) + + # Update states to simulate one time step. + model.update_states() + + # Render the 4D canvas. + canvas_size = (20, 20, 20, 20) + canvas = model.render( + canvas_size, sigma_scale=1.0, normalize=True + ) + + # Visualize the 4D data. + if HAS_IPYWIDGETS: + logger.info("Launching interactive slicing tool for 4D data.") + interactive_slice(canvas) + else: + logger.info( + "ipywidgets not available; using maximum intensity projection instead." + ) + mip_projection(canvas) + + +if __name__ == "__main__": + main() diff --git a/examples/hackathon_feb16/sarasowti.py b/examples/hackathon_feb16/sarasowti.py new file mode 100644 index 00000000..5943d47f --- /dev/null +++ b/examples/hackathon_feb16/sarasowti.py @@ -0,0 +1,257 @@ +from dotenv import load_dotenv +from swarms import Agent +from swarms.utils.function_caller_model import OpenAIFunctionCaller +from pydantic import BaseModel, Field +from swarms.structs.conversation import Conversation + +# Load environment variables +load_dotenv() + +######################################## +# Define enhanced custom system prompts as strings +######################################## + + +class CallLog(BaseModel): + response_to_user: str = Field( + description="The response to the user's query" + ) + agent_name: str = Field( + description="The name of the agent to call" + ) + task: str = Field(description="The task to call the agent for") + + +MASTER_AGENT_SYS_PROMPT = """ +You are SARASWATI, the Master Orchestrator Agent of a sophisticated multi-agent system dedicated to revolutionizing college application guidance for high school students. + +You have two specialized agents under your command: + +1. Counselor Agent ("Counselor-Agent"): +- Expert in college admissions and academic guidance +- Use when students need practical advice about college selection, applications, academics, career planning, or financial aid +- Deploy for specific questions about admission requirements, essay writing, test prep, or college research +- Best for structured, information-heavy guidance + +2. Buddy Agent ("Buddy-Agent"): +- Supportive peer mentor focused on emotional wellbeing +- Use when students show signs of stress, anxiety, or need motivational support +- Deploy for confidence building, stress management, and general encouragement +- Best for emotional support and maintaining student morale + +Your core responsibilities include: + +1. Strategic Oversight and Coordination: +- Analyze student inputs holistically to determine which specialized agent is best suited to respond +- Maintain coherent conversation flow by seamlessly transitioning between agents +- Track conversation history and ensure consistent guidance across interactions +- Identify critical decision points requiring multi-agent collaboration + +2. Emotional Intelligence and Support Assessment: +- Monitor student sentiment and emotional state through language analysis +- Deploy the Buddy Agent for emotional support when stress indicators are detected +- Escalate to the Counselor Agent for professional guidance when specific concerns arise +- Ensure a balanced approach between emotional support and practical advice + +3. Progress Tracking and Optimization: +- Maintain detailed records of student progress, concerns, and milestone achievements +- Identify patterns in student engagement and adjust agent deployment accordingly +- Generate comprehensive progress reports for review +- Recommend personalized intervention strategies based on student performance + +4. Quality Control and Coordination: +- Evaluate the effectiveness of each agent's interactions +- Provide real-time feedback to optimize agent responses +- Ensure all advice aligns with current college admission trends and requirements +- Maintain consistency in guidance across all agent interactions + +5. Resource Management: +- Curate and distribute relevant resources based on student needs +- Coordinate information sharing between agents +- Maintain an updated knowledge base of college admission requirements +- Track and optimize resource utilization + +Your communication must be authoritative yet approachable, demonstrating both leadership and empathy. +""" + +SUPERVISOR_AGENT_SYS_PROMPT = """ +You are the Supervisor Agent for SARASWATI, an advanced multi-agent system dedicated to guiding high school students through the college application process. Your comprehensive responsibilities include: + +1. Interaction Monitoring: +- Real-time analysis of all agent-student conversations +- Detection of communication gaps or misalignments +- Assessment of information accuracy and relevance +- Identification of opportunities for deeper engagement + +2. Performance Evaluation: +- Detailed analysis of conversation transcripts +- Assessment of emotional intelligence in responses +- Evaluation of advice quality and actionability +- Measurement of student engagement and response + +3. Strategic Coordination: +- Synchronization of Counselor and Buddy agent activities +- Implementation of intervention strategies when needed +- Optimization of information flow between agents +- Development of personalized support frameworks + +4. Quality Improvement: +- Generation of detailed performance metrics +- Implementation of corrective measures +- Documentation of best practices +- Continuous refinement of interaction protocols + +Maintain unwavering focus on optimizing the student's journey while ensuring all guidance is accurate, timely, and constructive. +""" + +COUNSELOR_AGENT_SYS_PROMPT = """ +You are the eCounselor Agent for SARASWATI, embodying the role of an expert high school counselor with deep knowledge of the college admission process. Your comprehensive responsibilities include: + +1. Academic Assessment and Planning: +- Detailed evaluation of academic performance and course selection +- Strategic planning for standardized test preparation +- Development of personalized academic improvement strategies +- Guidance on advanced placement and honors courses + +2. College Selection Guidance: +- Analysis of student preferences and capabilities +- Research on suitable college options +- Evaluation of admission probability +- Development of balanced college lists + +3. Application Strategy: +- Timeline creation and milestone tracking +- Essay topic brainstorming and refinement +- Extracurricular activity optimization +- Application component prioritization + +4. Career and Major Exploration: +- Interest and aptitude assessment +- Career pathway analysis +- Major selection guidance +- Industry trend awareness + +5. Financial Planning Support: +- Scholarship opportunity identification +- Financial aid application guidance +- Cost-benefit analysis of college options +- Budget planning assistance + +Maintain a professional yet approachable demeanor, ensuring all advice is practical, current, and tailored to each student's unique situation. +""" + +BUDDY_AGENT_SYS_PROMPT = """ +You are the Buddy Agent for SARASWATI, designed to be a supportive peer mentor for students navigating the college application process. Your extensive responsibilities include: + +1. Emotional Support: +- Active listening and validation of feelings +- Stress management guidance +- Confidence building +- Anxiety reduction techniques + +2. Motivational Guidance: +- Goal setting assistance +- Progress celebration +- Resilience building +- Positive reinforcement + +3. Personal Development: +- Time management strategies +- Study habit optimization +- Work-life balance advice +- Self-care promotion + +4. Social Support: +- Peer pressure management +- Family expectation navigation +- Social anxiety addressing +- Community building guidance + +5. Communication Facilitation: +- Open dialogue encouragement +- Question asking promotion +- Feedback solicitation +- Concern articulation support + +Maintain a warm, friendly, and authentic presence while ensuring all interactions promote student well-being and success. +""" + +######################################## +# Initialize Agents using swarms +######################################## + +model = OpenAIFunctionCaller( + base_model=CallLog, + system_prompt=MASTER_AGENT_SYS_PROMPT, +) + + +# Counselor Agent +counselor_agent = Agent( + agent_name="Counselor-Agent", + agent_description="Provides empathetic and effective college counseling and guidance.", + system_prompt=COUNSELOR_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4o", + dynamic_temperature_enabled=True, +) + +# Buddy Agent +buddy_agent = Agent( + agent_name="Buddy-Agent", + agent_description="Acts as a supportive, friendly companion to the student.", + system_prompt=BUDDY_AGENT_SYS_PROMPT, + max_loops=1, + model_name="gpt-4o", + dynamic_temperature_enabled=True, +) + + +worker_agents = [counselor_agent, buddy_agent] + + +class Swarm: + def __init__( + self, + agents: list = [counselor_agent, buddy_agent], + max_loops: int = 1, + ): + self.agents = agents + self.max_loops = max_loops + self.conversation = Conversation() + + def step(self, task: str): + + self.conversation.add(role="User", content=task) + + function_call = model.run(task) + + self.conversation.add( + role="Master-SARASWATI", content=function_call + ) + + print(function_call) + print(type(function_call)) + + agent_name = function_call.agent_name + agent_task = function_call.task + + agent = self.find_agent_by_name(agent_name) + + worker_output = agent.run(task=agent_task) + + self.conversation.add(role=agent_name, content=worker_output) + + return self.conversation.return_history_as_string() + + def find_agent_by_name(self, name: str): + for agent in self.agents: + if agent.agent_name == name: + return agent + return None + + +swarm = Swarm() +swarm.step( + "Hey, I am a high school student and I am looking for a college to apply to." +) diff --git a/examples/hackathon_feb16/swarms_of_browser_agents.py b/examples/hackathon_feb16/swarms_of_browser_agents.py new file mode 100644 index 00000000..31d8520b --- /dev/null +++ b/examples/hackathon_feb16/swarms_of_browser_agents.py @@ -0,0 +1,42 @@ +import asyncio + +from browser_use import Agent +from dotenv import load_dotenv +from langchain_openai import ChatOpenAI + +from swarms import ConcurrentWorkflow + +load_dotenv() + + +class BrowserAgent: + def __init__(self, agent_name: str = "BrowserAgent"): + self.agent_name = agent_name + + async def browser_agent_test(self, task: str): + agent = Agent( + task=task, + llm=ChatOpenAI(model="gpt-4o"), + ) + result = await agent.run() + return result + + def run(self, task: str): + return asyncio.run(self.browser_agent_test(task)) + + +swarm = ConcurrentWorkflow( + agents=[BrowserAgent() for _ in range(10)], +) + +swarm.run( + """Please navigate to chat.com and engage in a detailed technical discussion with ChatGPT about the following specific aspects of future high-energy physics: + +1. The potential discoveries and physics reach of the Future Circular Collider (FCC) compared to the LHC +2. Theoretical predictions for supersymmetric particles and dark matter candidates at energy scales above 100 TeV +3. Novel detector technologies needed for future collider experiments, particularly for tracking and calorimetry +4. The role of machine learning and quantum computing in analyzing high-energy physics data +5. Challenges and proposed solutions for beam focusing and acceleration at extremely high energies + +Please document the key insights and technical details from this discussion.""" +) diff --git a/pyproject.toml b/pyproject.toml index 80b11403..716c6fb1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "7.2.2" +version = "7.2.4" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 783206ad..de2dcdca 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -24,6 +24,7 @@ import yaml from loguru import logger from pydantic import BaseModel +from swarms.agents.agent_print import agent_print from swarms.agents.ape_agent import auto_generate_prompt from swarms.artifacts.main_artifact import Artifact from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3 @@ -39,19 +40,20 @@ from swarms.schemas.base_schemas import ( ) from swarms.structs.concat import concat_strings from swarms.structs.conversation import Conversation + +# from swarms.structs.multi_agent_exec import run_agents_concurrently from swarms.structs.safe_loading import ( SafeLoaderUtils, SafeStateManager, ) +from swarms.telemetry.main import log_agent_data from swarms.tools.base_tool import BaseTool from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.utils.data_to_text import data_to_text from swarms.utils.file_processing import create_file_in_folder from swarms.utils.formatter import formatter -from swarms.utils.pdf_to_text import pdf_to_text -from swarms.telemetry.main import log_agent_data -from swarms.agents.agent_print import agent_print from swarms.utils.litellm_tokenizer import count_tokens +from swarms.utils.pdf_to_text import pdf_to_text # Utils @@ -2560,3 +2562,38 @@ class Agent: return formatter.print_table( f"Agent: {self.agent_name} Configuration", config_dict ) + + def talk_to( + self, agent: Any, task: str, img: str = None, *args, **kwargs + ) -> Any: + """ + Talk to another agent. + """ + # return agent.run(f"{agent.agent_name}: {task}", img, *args, **kwargs) + output = self.run( + f"{self.agent_name}: {task}", img, *args, **kwargs + ) + + return agent.run( + task=f"From {self.agent_name}: {output}", + img=img, + *args, + **kwargs, + ) + + def talk_to_multiple_agents( + self, + agents: List[Union[Any, Callable]], + task: str, + *args, + **kwargs, + ) -> Any: + """ + Talk to multiple agents. + """ + outputs = [] + for agent in agents: + output = self.talk_to(agent, task, *args, **kwargs) + outputs.append(output) + + return outputs diff --git a/swarms/structs/airflow_swarm.py b/swarms/structs/airflow_swarm.py new file mode 100644 index 00000000..84ea68c7 --- /dev/null +++ b/swarms/structs/airflow_swarm.py @@ -0,0 +1,430 @@ +import subprocess +import sys +import uuid +import threading +from concurrent.futures import ( + FIRST_COMPLETED, + ThreadPoolExecutor, + wait, +) +from dataclasses import dataclass +from datetime import datetime, timedelta +from enum import Enum +from typing import Any, Dict, List, Optional, Set, Union +from graphviz import Digraph +from loguru import logger + +# Airflow imports +try: + from airflow import DAG + from airflow.operators.python import PythonOperator +except ImportError: + logger.error( + "Airflow is not installed. Please install it using 'pip install apache-airflow'." + ) + subprocess.run( + [sys.executable, "-m", "pip", "install", "apache-airflow"] + ) + from airflow import DAG + from airflow.operators.python import PythonOperator + +# Import the real Agent from swarms. +from swarms.structs.conversation import Conversation + + +class NodeType(Enum): + AGENT = "agent" + CALLABLE = "callable" + TOOL = "tool" + + +def dag_id(): + return uuid.uuid4().hex + + +@dataclass +class Node: + """Represents a node in the DAG""" + + id: str + type: NodeType + component: Any # Agent, Callable, or Tool + query: Optional[str] = None + args: Optional[List[Any]] = None + kwargs: Optional[Dict[str, Any]] = None + concurrent: bool = False + + +# ======= Airflow DAG Swarm Class ========= +class AirflowDAGSwarm: + """ + A simplified and more intuitive DAG-based swarm for orchestrating agents, callables, and tools. + Provides an easy-to-use API for building agent pipelines with support for concurrent execution. + """ + + def __init__( + self, + dag_id: str = dag_id(), + description: str = "A DAG Swarm for Airflow", + name: str = "Airflow DAG Swarm", + schedule_interval: Union[timedelta, str] = timedelta(days=1), + start_date: datetime = datetime(2025, 2, 14), + default_args: Optional[Dict[str, Any]] = None, + initial_message: Optional[str] = None, + max_workers: int = 5, + ): + """Initialize the AirflowDAGSwarm with improved configuration.""" + self.dag_id = dag_id + self.name = name + self.description = description + self.max_workers = max_workers + + self.default_args = default_args or { + "owner": "airflow", + "depends_on_past": False, + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), + } + + self.dag = DAG( + dag_id=dag_id, + default_args=self.default_args, + schedule_interval=schedule_interval, + start_date=start_date, + catchup=False, + ) + + self.nodes: Dict[str, Node] = {} + self.edges: Dict[str, Set[str]] = ( + {} + ) # node_id -> set of child node_ids + + # Initialize conversation + self.conversation = Conversation() + if initial_message: + self.conversation.add("user", initial_message) + + self.lock = threading.Lock() + + def add_user_message(self, message: str) -> None: + """Add a user message to the conversation.""" + with self.lock: + self.conversation.add("user", message) + logger.info(f"Added user message: {message}") + + def get_conversation_history(self) -> str: + """Get the conversation history as JSON.""" + return self.conversation.to_json() + + def add_node( + self, + node_id: str, + component: Any, + node_type: NodeType, + query: Optional[str] = None, + args: Optional[List[Any]] = None, + kwargs: Optional[Dict[str, Any]] = None, + concurrent: bool = False, + ) -> str: + """ + Add a node to the DAG with improved type checking and validation. + + Args: + node_id: Unique identifier for the node + component: Agent, callable, or tool to execute + node_type: Type of the node (AGENT, CALLABLE, or TOOL) + query: Query string for agents + args: Positional arguments for callables/tools + kwargs: Keyword arguments for callables/tools + concurrent: Whether to execute this node concurrently + + Returns: + node_id: The ID of the created node + """ + if node_id in self.nodes: + raise ValueError(f"Node with ID {node_id} already exists") + + if node_type == NodeType.AGENT and not hasattr( + component, "run" + ): + raise ValueError("Agent must have a 'run' method") + elif node_type in ( + NodeType.CALLABLE, + NodeType.TOOL, + ) and not callable(component): + raise ValueError(f"{node_type.value} must be callable") + + node = Node( + id=node_id, + type=node_type, + component=component, + query=query, + args=args or [], + kwargs=kwargs or {}, + concurrent=concurrent, + ) + + self.nodes[node_id] = node + self.edges[node_id] = set() + logger.info(f"Added {node_type.value} node: {node_id}") + return node_id + + def add_edge(self, from_node: str, to_node: str) -> None: + """ + Add a directed edge between two nodes in the DAG. + + Args: + from_node: ID of the source node + to_node: ID of the target node + """ + if from_node not in self.nodes or to_node not in self.nodes: + raise ValueError("Both nodes must exist in the DAG") + + self.edges[from_node].add(to_node) + logger.info(f"Added edge: {from_node} -> {to_node}") + + def _execute_node(self, node: Node) -> str: + """Execute a single node and return its output.""" + try: + if node.type == NodeType.AGENT: + query = ( + node.query + or self.conversation.get_last_message_as_string() + or "Default query" + ) + logger.info( + f"Executing agent node {node.id} with query: {query}" + ) + return node.component.run(query) + + elif node.type in (NodeType.CALLABLE, NodeType.TOOL): + logger.info( + f"Executing {node.type.value} node {node.id}" + ) + return node.component( + *node.args, + conversation=self.conversation, + **node.kwargs, + ) + except Exception as e: + logger.exception(f"Error executing node {node.id}: {e}") + return f"Error in node {node.id}: {str(e)}" + + def _get_root_nodes(self) -> List[str]: + """Get nodes with no incoming edges.""" + all_nodes = set(self.nodes.keys()) + nodes_with_incoming = { + node for edges in self.edges.values() for node in edges + } + return list(all_nodes - nodes_with_incoming) + + def run(self, **context: Any) -> str: + """ + Execute the DAG with improved concurrency handling and error recovery. + + Returns: + The final conversation state as a JSON string + """ + logger.info("Starting swarm execution") + + # Track completed nodes and their results + completed: Dict[str, str] = {} + + def can_execute_node(node_id: str) -> bool: + """Check if all dependencies of a node are completed.""" + return all( + dep in completed + for dep_set in self.edges.values() + for dep in dep_set + if node_id in dep_set + ) + + with ThreadPoolExecutor( + max_workers=self.max_workers + ) as executor: + # Initialize futures dict for concurrent root nodes + futures_dict = { + executor.submit( + self._execute_node, self.nodes[node_id] + ): node_id + for node_id in self._get_root_nodes() + if self.nodes[node_id].concurrent + } + + # Execute nodes that shouldn't run concurrently + for node_id in self._get_root_nodes(): + if not self.nodes[node_id].concurrent: + output = self._execute_node(self.nodes[node_id]) + with self.lock: + completed[node_id] = output + self.conversation.add("assistant", output) + + # Process remaining nodes + while futures_dict: + done, _ = wait( + futures_dict.keys(), return_when=FIRST_COMPLETED + ) + + for future in done: + node_id = futures_dict.pop(future) + try: + output = future.result() + with self.lock: + completed[node_id] = output + self.conversation.add("assistant", output) + except Exception as e: + logger.exception( + f"Error in node {node_id}: {e}" + ) + completed[node_id] = f"Error: {str(e)}" + + # Add new nodes that are ready to execute + new_nodes = [ + node_id + for node_id in self.nodes + if node_id not in completed + and can_execute_node(node_id) + ] + + for node_id in new_nodes: + if self.nodes[node_id].concurrent: + future = executor.submit( + self._execute_node, + self.nodes[node_id], + ) + futures_dict[future] = node_id + else: + output = self._execute_node( + self.nodes[node_id] + ) + with self.lock: + completed[node_id] = output + self.conversation.add( + "assistant", output + ) + + return self.conversation.to_json() + + def visualize( + self, filename: str = "dag_visualization", view: bool = True + ) -> Digraph: + """ + Generate a visualization of the DAG structure. + + Args: + filename: Output filename for the visualization + view: Whether to open the visualization + + Returns: + Graphviz Digraph object + """ + dot = Digraph(comment=f"DAG Visualization: {self.name}") + + # Add nodes + for node_id, node in self.nodes.items(): + label = f"{node_id}\n({node.type.value})" + shape = "box" if node.concurrent else "ellipse" + dot.node(node_id, label, shape=shape) + + # Add edges + for from_node, to_nodes in self.edges.items(): + for to_node in to_nodes: + dot.edge(from_node, to_node) + + dot.render(filename, view=view, format="pdf") + return dot + + def create_dag(self) -> DAG: + """ + Create an Airflow DAG with a single PythonOperator that executes the entire swarm. + In a production environment, you might break the components into multiple tasks. + + :return: The configured Airflow DAG. + """ + logger.info("Creating Airflow DAG for swarm execution.") + PythonOperator( + task_id="run", + python_callable=self.run, + op_kwargs={ + "concurrent": False + }, # Change to True for concurrent execution. + dag=self.dag, + ) + return self.dag + + +# # ======= Example Usage ========= +# if __name__ == "__main__": +# # Configure logger to output to console. +# logger.remove() +# logger.add(lambda msg: print(msg, end=""), level="DEBUG") + +# # Create the DAG swarm with an initial message +# swarm = AirflowDAGSwarm( +# dag_id="swarm_conversation_dag", +# initial_message="Hello, how can I help you with financial planning?", +# ) + +# # Create a real financial agent using the swarms package. +# financial_agent = Agent( +# agent_name="Financial-Analysis-Agent", +# system_prompt=FINANCIAL_AGENT_SYS_PROMPT, +# model_name="gpt-4o-mini", +# max_loops=1, +# ) + +# # Add the real agent with a specific query. +# swarm.add_node( +# "financial_advisor", +# financial_agent, +# NodeType.AGENT, +# query="How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria", +# concurrent=True, +# ) + +# # Add a callable component. +# def extra_processing(x: int, conversation: Conversation) -> str: +# return f"Extra processing output with data {x} and conversation length {len(conversation.messages)}" + +# swarm.add_node( +# "extra_processing", +# extra_processing, +# NodeType.CALLABLE, +# args=[42], +# concurrent=True, +# ) + +# # Add a tool component (for example, a tool to create a conversation graph). +# def create_conversation_graph(conversation: Conversation) -> str: +# # In this tool, we generate the graph and return a confirmation message. +# swarm.visualize( +# filename="swarm_conversation_tool_graph", view=False +# ) +# return "Graph created." + +# swarm.add_node( +# "conversation_graph", +# create_conversation_graph, +# NodeType.TOOL, +# concurrent=False, +# ) + +# # Add edges to create the pipeline +# swarm.add_edge("financial_advisor", "extra_processing") +# swarm.add_edge("extra_processing", "conversation_graph") + +# # Execute the swarm +# final_state = swarm.run() +# logger.info(f"Final conversation: {final_state}") + +# # Visualize the DAG +# print( +# swarm.visualize( +# filename="swarm_conversation_final", view=False +# ) +# ) + +# # Create the Airflow DAG. +# dag = swarm.create_dag() diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index 5a2eef65..7e197b51 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -3,10 +3,9 @@ import os import uuid from concurrent.futures import ThreadPoolExecutor from datetime import datetime -from typing import Any, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Union from pydantic import BaseModel, Field -from tenacity import retry, stop_after_attempt, wait_exponential from swarms.structs.agent import Agent from swarms.structs.base_swarm import BaseSwarm @@ -101,7 +100,7 @@ class ConcurrentWorkflow(BaseSwarm): self, name: str = "ConcurrentWorkflow", description: str = "Execution of multiple agents concurrently", - agents: List[Agent] = [], + agents: List[Union[Agent, Callable]] = [], metadata_output_path: str = "agent_metadata.json", auto_save: bool = True, output_schema: BaseModel = MetadataSchema, @@ -176,15 +175,12 @@ class ConcurrentWorkflow(BaseSwarm): for agent in self.agents: agent.auto_generate_prompt = True - @retry(wait=wait_exponential(min=2), stop=stop_after_attempt(3)) + # @retry(wait=wait_exponential(min=2), stop=stop_after_attempt(3)) async def _run_agent( self, agent: Agent, task: str, - img: str, executor: ThreadPoolExecutor, - *args, - **kwargs, ) -> AgentOutputSchema: """ Runs a single agent with the given task and tracks its output and metadata with retry logic. @@ -208,7 +204,9 @@ class ConcurrentWorkflow(BaseSwarm): try: loop = asyncio.get_running_loop() output = await loop.run_in_executor( - executor, agent.run, task, img, *args, **kwargs + executor, + agent.run, + task, ) except Exception as e: logger.error( @@ -260,9 +258,9 @@ class ConcurrentWorkflow(BaseSwarm): # Return the agent responses as a string return "\n".join(self.agent_responses) - @retry(wait=wait_exponential(min=2), stop=stop_after_attempt(3)) + # @retry(wait=wait_exponential(min=2), stop=stop_after_attempt(3)) async def _execute_agents_concurrently( - self, task: str, img: str, *args, **kwargs + self, task: str, img: str = None, *args, **kwargs ) -> MetadataSchema: """ Executes multiple agents concurrently with the same task, incorporating retry logic for failed executions. @@ -284,7 +282,11 @@ class ConcurrentWorkflow(BaseSwarm): ) as executor: tasks_to_run = [ self._run_agent( - agent, task, img, executor, *args, **kwargs + agent=agent, + task=task, + executor=executor, + *args, + **kwargs, ) for agent in self.agents ] @@ -317,7 +319,7 @@ class ConcurrentWorkflow(BaseSwarm): ) def _run( - self, task: str, img: str, *args, **kwargs + self, task: str, img: str = None, *args, **kwargs ) -> Union[Dict[str, Any], str]: """ Runs the workflow for the given task, executes agents concurrently, and saves metadata in a production-grade manner. diff --git a/swarms/utils/function_caller_model.py b/swarms/utils/function_caller_model.py index ab2e8772..36642308 100644 --- a/swarms/utils/function_caller_model.py +++ b/swarms/utils/function_caller_model.py @@ -53,7 +53,7 @@ class OpenAIFunctionCaller: self, system_prompt: str, base_model: BaseModel, - api_key: str = check_api_key(), + api_key: str = os.getenv("OPENAI_API_KEY"), temperature: float = 0.1, max_tokens: int = 5000, model_name: str = "gpt-4o-2024-08-06", @@ -96,14 +96,6 @@ class OpenAIFunctionCaller: except Exception as e: print(f"There was an error: {e}") - def check_api_key(self): - self.api_key = os.getenv("OPENAI_API_KEY") - - if self.api_key is None: - raise ValueError( - "API key is not set. Please set the API key using the api_key parameter." - ) - def check_model_support(self): # need to print the supported models for model in SUPPORTED_MODELS: diff --git a/tests/structs/test_airflow_swarm.py b/tests/structs/test_airflow_swarm.py new file mode 100644 index 00000000..0463ddb7 --- /dev/null +++ b/tests/structs/test_airflow_swarm.py @@ -0,0 +1,313 @@ +import time + +from loguru import logger +from swarms import Agent + +from swarms.structs.airflow_swarm import ( + AirflowDAGSwarm, + NodeType, + Conversation, +) + +# Configure logger +logger.remove() +logger.add(lambda msg: print(msg, end=""), level="DEBUG") + + +def test_swarm_initialization(): + """Test basic swarm initialization and configuration.""" + try: + swarm = AirflowDAGSwarm( + dag_id="test_dag", + name="Test DAG", + initial_message="Test message", + ) + assert swarm.dag_id == "test_dag", "DAG ID not set correctly" + assert swarm.name == "Test DAG", "Name not set correctly" + assert ( + len(swarm.nodes) == 0 + ), "Nodes should be empty on initialization" + assert ( + len(swarm.edges) == 0 + ), "Edges should be empty on initialization" + + # Test initial message + conv_json = swarm.get_conversation_history() + assert ( + "Test message" in conv_json + ), "Initial message not set correctly" + print("✅ Swarm initialization test passed") + return True + except AssertionError as e: + print(f"❌ Swarm initialization test failed: {str(e)}") + return False + + +def test_node_addition(): + """Test adding different types of nodes to the swarm.""" + try: + swarm = AirflowDAGSwarm(dag_id="test_dag") + + # Test adding an agent node + agent = Agent( + agent_name="Test-Agent", + system_prompt="Test prompt", + model_name="gpt-4o-mini", + max_loops=1, + ) + agent_id = swarm.add_node( + "test_agent", + agent, + NodeType.AGENT, + query="Test query", + concurrent=True, + ) + assert ( + agent_id == "test_agent" + ), "Agent node ID not returned correctly" + assert ( + "test_agent" in swarm.nodes + ), "Agent node not added to nodes dict" + + # Test adding a callable node + def test_callable(x: int, conversation: Conversation) -> str: + return f"Test output {x}" + + callable_id = swarm.add_node( + "test_callable", + test_callable, + NodeType.CALLABLE, + args=[42], + concurrent=False, + ) + assert ( + callable_id == "test_callable" + ), "Callable node ID not returned correctly" + assert ( + "test_callable" in swarm.nodes + ), "Callable node not added to nodes dict" + + print("✅ Node addition test passed") + return True + except AssertionError as e: + print(f"❌ Node addition test failed: {str(e)}") + return False + except Exception as e: + print( + f"❌ Node addition test failed with unexpected error: {str(e)}" + ) + return False + + +def test_edge_addition(): + """Test adding edges between nodes.""" + try: + swarm = AirflowDAGSwarm(dag_id="test_dag") + + # Add two nodes + def node1_fn(conversation: Conversation) -> str: + return "Node 1 output" + + def node2_fn(conversation: Conversation) -> str: + return "Node 2 output" + + swarm.add_node("node1", node1_fn, NodeType.CALLABLE) + swarm.add_node("node2", node2_fn, NodeType.CALLABLE) + + # Add edge between them + swarm.add_edge("node1", "node2") + + assert ( + "node2" in swarm.edges["node1"] + ), "Edge not added correctly" + assert ( + len(swarm.edges["node1"]) == 1 + ), "Incorrect number of edges" + + # Test adding edge with non-existent node + try: + swarm.add_edge("node1", "non_existent") + assert ( + False + ), "Should raise ValueError for non-existent node" + except ValueError: + pass + + print("✅ Edge addition test passed") + return True + except AssertionError as e: + print(f"❌ Edge addition test failed: {str(e)}") + return False + + +def test_execution_order(): + """Test that nodes are executed in the correct order based on dependencies.""" + try: + swarm = AirflowDAGSwarm(dag_id="test_dag") + execution_order = [] + + def node1(conversation: Conversation) -> str: + execution_order.append("node1") + return "Node 1 output" + + def node2(conversation: Conversation) -> str: + execution_order.append("node2") + return "Node 2 output" + + def node3(conversation: Conversation) -> str: + execution_order.append("node3") + return "Node 3 output" + + # Add nodes + swarm.add_node( + "node1", node1, NodeType.CALLABLE, concurrent=False + ) + swarm.add_node( + "node2", node2, NodeType.CALLABLE, concurrent=False + ) + swarm.add_node( + "node3", node3, NodeType.CALLABLE, concurrent=False + ) + + # Add edges to create a chain: node1 -> node2 -> node3 + swarm.add_edge("node1", "node2") + swarm.add_edge("node2", "node3") + + # Execute + swarm.run() + + # Check execution order + assert execution_order == [ + "node1", + "node2", + "node3", + ], "Incorrect execution order" + print("✅ Execution order test passed") + return True + except AssertionError as e: + print(f"❌ Execution order test failed: {str(e)}") + return False + + +def test_concurrent_execution(): + """Test concurrent execution of nodes.""" + try: + swarm = AirflowDAGSwarm(dag_id="test_dag") + + def slow_node1(conversation: Conversation) -> str: + time.sleep(0.5) + return "Slow node 1 output" + + def slow_node2(conversation: Conversation) -> str: + time.sleep(0.5) + return "Slow node 2 output" + + # Add nodes with concurrent=True + swarm.add_node( + "slow1", slow_node1, NodeType.CALLABLE, concurrent=True + ) + swarm.add_node( + "slow2", slow_node2, NodeType.CALLABLE, concurrent=True + ) + + # Measure execution time + start_time = time.time() + swarm.run() + execution_time = time.time() - start_time + + # Should take ~0.5s for concurrent execution, not ~1s + assert ( + execution_time < 0.8 + ), "Concurrent execution took too long" + print("✅ Concurrent execution test passed") + return True + except AssertionError as e: + print(f"❌ Concurrent execution test failed: {str(e)}") + return False + + +def test_conversation_handling(): + """Test conversation management within the swarm.""" + try: + swarm = AirflowDAGSwarm( + dag_id="test_dag", initial_message="Initial test message" + ) + + # Test adding user messages + swarm.add_user_message("Test message 1") + swarm.add_user_message("Test message 2") + + history = swarm.get_conversation_history() + assert ( + "Initial test message" in history + ), "Initial message not in history" + assert ( + "Test message 1" in history + ), "First message not in history" + assert ( + "Test message 2" in history + ), "Second message not in history" + + print("✅ Conversation handling test passed") + return True + except AssertionError as e: + print(f"❌ Conversation handling test failed: {str(e)}") + return False + + +def test_error_handling(): + """Test error handling in node execution.""" + try: + swarm = AirflowDAGSwarm(dag_id="test_dag") + + def failing_node(conversation: Conversation) -> str: + raise ValueError("Test error") + + swarm.add_node("failing", failing_node, NodeType.CALLABLE) + + # Execute should not raise an exception + result = swarm.run() + + assert ( + "Error" in result + ), "Error not captured in execution result" + assert ( + "Test error" in result + ), "Specific error message not captured" + + print("✅ Error handling test passed") + return True + except Exception as e: + print(f"❌ Error handling test failed: {str(e)}") + return False + + +def run_all_tests(): + """Run all test functions and report results.""" + tests = [ + test_swarm_initialization, + test_node_addition, + test_edge_addition, + test_execution_order, + test_concurrent_execution, + test_conversation_handling, + test_error_handling, + ] + + results = [] + for test in tests: + print(f"\nRunning {test.__name__}...") + result = test() + results.append(result) + + total = len(results) + passed = sum(results) + print("\n=== Test Results ===") + print(f"Total tests: {total}") + print(f"Passed: {passed}") + print(f"Failed: {total - passed}") + print("==================") + + +if __name__ == "__main__": + run_all_tests()