commit
c5047212f6
@ -0,0 +1,23 @@
|
||||
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
|
||||
import json
|
||||
|
||||
swarm = AutoSwarmBuilder(
|
||||
name="My Swarm",
|
||||
description="A swarm of agents",
|
||||
verbose=True,
|
||||
max_loops=1,
|
||||
# random_models=False,
|
||||
# return_agents=True,
|
||||
model_name="gpt-4o-mini",
|
||||
# generate_router_config=True,
|
||||
return_agents=True,
|
||||
)
|
||||
|
||||
print(
|
||||
json.dumps(
|
||||
swarm.run(
|
||||
task="Create an accounting team to analyze crypto transactions, there must be 5 agents in the team with extremely extensive prompts. Make the prompts extremely detailed and specific and long and comprehensive. Make sure to include all the details of the task in the prompts."
|
||||
),
|
||||
indent=4,
|
||||
)
|
||||
)
|
@ -1,39 +1,14 @@
|
||||
import os
|
||||
from swarms_client import SwarmsClient
|
||||
from swarms_client.types import AgentSpecParam
|
||||
import json
|
||||
from dotenv import load_dotenv
|
||||
from swarms_client import SwarmsClient
|
||||
|
||||
load_dotenv()
|
||||
|
||||
client = SwarmsClient(api_key=os.getenv("SWARMS_API_KEY"))
|
||||
|
||||
agent_spec = AgentSpecParam(
|
||||
agent_name="doctor_agent",
|
||||
description="A virtual doctor agent that provides evidence-based, safe, and empathetic medical advice for common health questions. Always reminds users to consult a healthcare professional for diagnoses or prescriptions.",
|
||||
task="What is the best medicine for a cold?",
|
||||
model_name="claude-4-sonnet-20250514",
|
||||
system_prompt=(
|
||||
"You are a highly knowledgeable, ethical, and empathetic virtual doctor. "
|
||||
"Always provide evidence-based, safe, and practical medical advice. "
|
||||
"If a question requires a diagnosis, prescription, or urgent care, remind the user to consult a licensed healthcare professional. "
|
||||
"Be clear, concise, and avoid unnecessary medical jargon. "
|
||||
"Never provide information that could be unsafe or misleading. "
|
||||
"If unsure, say so and recommend seeing a real doctor."
|
||||
),
|
||||
max_loops=1,
|
||||
temperature=0.4,
|
||||
role="doctor",
|
||||
)
|
||||
|
||||
response = client.agent.run(
|
||||
agent_config=agent_spec,
|
||||
task="What is the best medicine for a cold?",
|
||||
)
|
||||
|
||||
print(response)
|
||||
|
||||
# print(json.dumps(client.models.list_available(), indent=4))
|
||||
# print(json.dumps(client.health.check(), indent=4))
|
||||
# print(json.dumps(client.swarms.get_logs(), indent=4))
|
||||
# print(json.dumps(client.client.rate.get_limits(), indent=4))
|
||||
# print(json.dumps(client.swarms.check_available(), indent=4))
|
||||
print(json.dumps(client.models.list_available(), indent=4))
|
||||
print(json.dumps(client.health.check(), indent=4))
|
||||
print(json.dumps(client.swarms.get_logs(), indent=4))
|
||||
print(json.dumps(client.client.rate.get_limits(), indent=4))
|
||||
print(json.dumps(client.swarms.check_available(), indent=4))
|
||||
|
@ -0,0 +1,349 @@
|
||||
"""
|
||||
Cryptocurrency Concurrent Multi-Agent Cron Job Example
|
||||
|
||||
This example demonstrates how to use ConcurrentWorkflow with CronJob to create
|
||||
a powerful cryptocurrency tracking system. Each specialized agent analyzes a
|
||||
specific cryptocurrency concurrently every minute.
|
||||
|
||||
Features:
|
||||
- ConcurrentWorkflow for parallel agent execution
|
||||
- CronJob scheduling for automated runs every 1 minute
|
||||
- Each agent specializes in analyzing one specific cryptocurrency
|
||||
- Real-time data fetching from CoinGecko API
|
||||
- Concurrent analysis of multiple cryptocurrencies
|
||||
- Structured output with professional formatting
|
||||
|
||||
Architecture:
|
||||
CronJob -> ConcurrentWorkflow -> [Bitcoin Agent, Ethereum Agent, Solana Agent, etc.] -> Parallel Analysis
|
||||
"""
|
||||
|
||||
from typing import List
|
||||
from loguru import logger
|
||||
|
||||
from swarms import Agent, CronJob, ConcurrentWorkflow
|
||||
from swarms_tools import coin_gecko_coin_api
|
||||
|
||||
|
||||
def create_crypto_specific_agents() -> List[Agent]:
|
||||
"""
|
||||
Creates agents that each specialize in analyzing a specific cryptocurrency.
|
||||
|
||||
Returns:
|
||||
List[Agent]: List of cryptocurrency-specific Agent instances
|
||||
"""
|
||||
|
||||
# Bitcoin Specialist Agent
|
||||
bitcoin_agent = Agent(
|
||||
agent_name="Bitcoin-Analyst",
|
||||
agent_description="Expert analyst specializing exclusively in Bitcoin (BTC) analysis and market dynamics",
|
||||
system_prompt="""You are a Bitcoin specialist and expert analyst. Your expertise includes:
|
||||
|
||||
BITCOIN SPECIALIZATION:
|
||||
- Bitcoin's unique position as digital gold
|
||||
- Bitcoin halving cycles and their market impact
|
||||
- Bitcoin mining economics and hash rate analysis
|
||||
- Lightning Network and Layer 2 developments
|
||||
- Bitcoin adoption by institutions and countries
|
||||
- Bitcoin's correlation with traditional markets
|
||||
- Bitcoin technical analysis and on-chain metrics
|
||||
- Bitcoin's role as a store of value and hedge against inflation
|
||||
|
||||
ANALYSIS FOCUS:
|
||||
- Analyze ONLY Bitcoin data from the provided dataset
|
||||
- Focus on Bitcoin-specific metrics and trends
|
||||
- Consider Bitcoin's unique market dynamics
|
||||
- Evaluate Bitcoin's dominance and market leadership
|
||||
- Assess institutional adoption trends
|
||||
- Monitor on-chain activity and network health
|
||||
|
||||
DELIVERABLES:
|
||||
- Bitcoin-specific analysis and insights
|
||||
- Price action assessment and predictions
|
||||
- Market dominance analysis
|
||||
- Institutional adoption impact
|
||||
- Technical and fundamental outlook
|
||||
- Risk factors specific to Bitcoin
|
||||
|
||||
Extract Bitcoin data from the provided dataset and provide comprehensive Bitcoin-focused analysis.""",
|
||||
model_name="groq/moonshotai/kimi-k2-instruct",
|
||||
max_loops=1,
|
||||
dynamic_temperature_enabled=True,
|
||||
streaming_on=False,
|
||||
tools=[coin_gecko_coin_api],
|
||||
)
|
||||
|
||||
# Ethereum Specialist Agent
|
||||
ethereum_agent = Agent(
|
||||
agent_name="Ethereum-Analyst",
|
||||
agent_description="Expert analyst specializing exclusively in Ethereum (ETH) analysis and ecosystem development",
|
||||
system_prompt="""You are an Ethereum specialist and expert analyst. Your expertise includes:
|
||||
|
||||
ETHEREUM SPECIALIZATION:
|
||||
- Ethereum's smart contract platform and DeFi ecosystem
|
||||
- Ethereum 2.0 transition and proof-of-stake mechanics
|
||||
- Gas fees, network usage, and scalability solutions
|
||||
- Layer 2 solutions (Arbitrum, Optimism, Polygon)
|
||||
- DeFi protocols and TVL (Total Value Locked) analysis
|
||||
- NFT markets and Ethereum's role in digital assets
|
||||
- Developer activity and ecosystem growth
|
||||
- EIP proposals and network upgrades
|
||||
|
||||
ANALYSIS FOCUS:
|
||||
- Analyze ONLY Ethereum data from the provided dataset
|
||||
- Focus on Ethereum's platform utility and network effects
|
||||
- Evaluate DeFi ecosystem health and growth
|
||||
- Assess Layer 2 adoption and scalability solutions
|
||||
- Monitor network usage and gas fee trends
|
||||
- Consider Ethereum's competitive position vs other smart contract platforms
|
||||
|
||||
DELIVERABLES:
|
||||
- Ethereum-specific analysis and insights
|
||||
- Platform utility and adoption metrics
|
||||
- DeFi ecosystem impact assessment
|
||||
- Network health and scalability evaluation
|
||||
- Competitive positioning analysis
|
||||
- Technical and fundamental outlook for ETH
|
||||
|
||||
Extract Ethereum data from the provided dataset and provide comprehensive Ethereum-focused analysis.""",
|
||||
model_name="groq/moonshotai/kimi-k2-instruct",
|
||||
max_loops=1,
|
||||
dynamic_temperature_enabled=True,
|
||||
streaming_on=False,
|
||||
tools=[coin_gecko_coin_api],
|
||||
)
|
||||
|
||||
# Solana Specialist Agent
|
||||
solana_agent = Agent(
|
||||
agent_name="Solana-Analyst",
|
||||
agent_description="Expert analyst specializing exclusively in Solana (SOL) analysis and ecosystem development",
|
||||
system_prompt="""You are a Solana specialist and expert analyst. Your expertise includes:
|
||||
|
||||
SOLANA SPECIALIZATION:
|
||||
- Solana's high-performance blockchain architecture
|
||||
- Proof-of-History consensus mechanism
|
||||
- Solana's DeFi ecosystem and DEX platforms (Serum, Raydium)
|
||||
- NFT marketplaces and creator economy on Solana
|
||||
- Network outages and reliability concerns
|
||||
- Developer ecosystem and Rust programming adoption
|
||||
- Validator economics and network decentralization
|
||||
- Cross-chain bridges and interoperability
|
||||
|
||||
ANALYSIS FOCUS:
|
||||
- Analyze ONLY Solana data from the provided dataset
|
||||
- Focus on Solana's performance and scalability advantages
|
||||
- Evaluate network stability and uptime improvements
|
||||
- Assess ecosystem growth and developer adoption
|
||||
- Monitor DeFi and NFT activity on Solana
|
||||
- Consider Solana's competitive position vs Ethereum
|
||||
|
||||
DELIVERABLES:
|
||||
- Solana-specific analysis and insights
|
||||
- Network performance and reliability assessment
|
||||
- Ecosystem growth and adoption metrics
|
||||
- DeFi and NFT market analysis
|
||||
- Competitive advantages and challenges
|
||||
- Technical and fundamental outlook for SOL
|
||||
|
||||
Extract Solana data from the provided dataset and provide comprehensive Solana-focused analysis.""",
|
||||
model_name="groq/moonshotai/kimi-k2-instruct",
|
||||
max_loops=1,
|
||||
dynamic_temperature_enabled=True,
|
||||
streaming_on=False,
|
||||
tools=[coin_gecko_coin_api],
|
||||
)
|
||||
|
||||
# Cardano Specialist Agent
|
||||
cardano_agent = Agent(
|
||||
agent_name="Cardano-Analyst",
|
||||
agent_description="Expert analyst specializing exclusively in Cardano (ADA) analysis and research-driven development",
|
||||
system_prompt="""You are a Cardano specialist and expert analyst. Your expertise includes:
|
||||
|
||||
CARDANO SPECIALIZATION:
|
||||
- Cardano's research-driven development approach
|
||||
- Ouroboros proof-of-stake consensus protocol
|
||||
- Smart contract capabilities via Plutus and Marlowe
|
||||
- Cardano's three-layer architecture (settlement, computation, control)
|
||||
- Academic partnerships and peer-reviewed research
|
||||
- Cardano ecosystem projects and DApp development
|
||||
- Native tokens and Cardano's UTXO model
|
||||
- Sustainability and treasury funding mechanisms
|
||||
|
||||
ANALYSIS FOCUS:
|
||||
- Analyze ONLY Cardano data from the provided dataset
|
||||
- Focus on Cardano's methodical development approach
|
||||
- Evaluate smart contract adoption and ecosystem growth
|
||||
- Assess academic partnerships and research contributions
|
||||
- Monitor native token ecosystem development
|
||||
- Consider Cardano's long-term roadmap and milestones
|
||||
|
||||
DELIVERABLES:
|
||||
- Cardano-specific analysis and insights
|
||||
- Development progress and milestone achievements
|
||||
- Smart contract ecosystem evaluation
|
||||
- Academic research impact assessment
|
||||
- Native token and DApp adoption metrics
|
||||
- Technical and fundamental outlook for ADA
|
||||
|
||||
Extract Cardano data from the provided dataset and provide comprehensive Cardano-focused analysis.""",
|
||||
model_name="groq/moonshotai/kimi-k2-instruct",
|
||||
max_loops=1,
|
||||
dynamic_temperature_enabled=True,
|
||||
streaming_on=False,
|
||||
tools=[coin_gecko_coin_api],
|
||||
)
|
||||
|
||||
# Binance Coin Specialist Agent
|
||||
bnb_agent = Agent(
|
||||
agent_name="BNB-Analyst",
|
||||
agent_description="Expert analyst specializing exclusively in BNB analysis and Binance ecosystem dynamics",
|
||||
system_prompt="""You are a BNB specialist and expert analyst. Your expertise includes:
|
||||
|
||||
BNB SPECIALIZATION:
|
||||
- BNB's utility within the Binance ecosystem
|
||||
- Binance Smart Chain (BSC) development and adoption
|
||||
- BNB token burns and deflationary mechanics
|
||||
- Binance exchange volume and market leadership
|
||||
- BSC DeFi ecosystem and yield farming
|
||||
- Cross-chain bridges and multi-chain strategies
|
||||
- Regulatory challenges facing Binance globally
|
||||
- BNB's role in transaction fee discounts and platform benefits
|
||||
|
||||
ANALYSIS FOCUS:
|
||||
- Analyze ONLY BNB data from the provided dataset
|
||||
- Focus on BNB's utility value and exchange benefits
|
||||
- Evaluate BSC ecosystem growth and competition with Ethereum
|
||||
- Assess token burn impact on supply and price
|
||||
- Monitor Binance platform developments and regulations
|
||||
- Consider BNB's centralized vs decentralized aspects
|
||||
|
||||
DELIVERABLES:
|
||||
- BNB-specific analysis and insights
|
||||
- Utility value and ecosystem benefits assessment
|
||||
- BSC adoption and DeFi growth evaluation
|
||||
- Token economics and burn mechanism impact
|
||||
- Regulatory risk and compliance analysis
|
||||
- Technical and fundamental outlook for BNB
|
||||
|
||||
Extract BNB data from the provided dataset and provide comprehensive BNB-focused analysis.""",
|
||||
model_name="groq/moonshotai/kimi-k2-instruct",
|
||||
max_loops=1,
|
||||
dynamic_temperature_enabled=True,
|
||||
streaming_on=False,
|
||||
tools=[coin_gecko_coin_api],
|
||||
)
|
||||
|
||||
# XRP Specialist Agent
|
||||
xrp_agent = Agent(
|
||||
agent_name="XRP-Analyst",
|
||||
agent_description="Expert analyst specializing exclusively in XRP analysis and cross-border payment solutions",
|
||||
system_prompt="""You are an XRP specialist and expert analyst. Your expertise includes:
|
||||
|
||||
XRP SPECIALIZATION:
|
||||
- XRP's role in cross-border payments and remittances
|
||||
- RippleNet adoption by financial institutions
|
||||
- Central Bank Digital Currency (CBDC) partnerships
|
||||
- Regulatory landscape and SEC lawsuit implications
|
||||
- XRP Ledger's consensus mechanism and energy efficiency
|
||||
- On-Demand Liquidity (ODL) usage and growth
|
||||
- Competition with SWIFT and traditional payment rails
|
||||
- Ripple's partnerships with banks and payment providers
|
||||
|
||||
ANALYSIS FOCUS:
|
||||
- Analyze ONLY XRP data from the provided dataset
|
||||
- Focus on XRP's utility in payments and remittances
|
||||
- Evaluate RippleNet adoption and institutional partnerships
|
||||
- Assess regulatory developments and legal clarity
|
||||
- Monitor ODL usage and transaction volumes
|
||||
- Consider XRP's competitive position in payments
|
||||
|
||||
DELIVERABLES:
|
||||
- XRP-specific analysis and insights
|
||||
- Payment utility and adoption assessment
|
||||
- Regulatory landscape and legal developments
|
||||
- Institutional partnership impact evaluation
|
||||
- Cross-border payment market analysis
|
||||
- Technical and fundamental outlook for XRP
|
||||
|
||||
Extract XRP data from the provided dataset and provide comprehensive XRP-focused analysis.""",
|
||||
model_name="groq/moonshotai/kimi-k2-instruct",
|
||||
max_loops=1,
|
||||
dynamic_temperature_enabled=True,
|
||||
streaming_on=False,
|
||||
tools=[coin_gecko_coin_api],
|
||||
)
|
||||
|
||||
return [
|
||||
bitcoin_agent,
|
||||
ethereum_agent,
|
||||
solana_agent,
|
||||
cardano_agent,
|
||||
bnb_agent,
|
||||
xrp_agent,
|
||||
]
|
||||
|
||||
|
||||
def create_crypto_workflow() -> ConcurrentWorkflow:
|
||||
"""
|
||||
Creates a ConcurrentWorkflow with cryptocurrency-specific analysis agents.
|
||||
|
||||
Returns:
|
||||
ConcurrentWorkflow: Configured workflow for crypto analysis
|
||||
"""
|
||||
agents = create_crypto_specific_agents()
|
||||
|
||||
workflow = ConcurrentWorkflow(
|
||||
name="Crypto-Specific-Analysis-Workflow",
|
||||
description="Concurrent execution of cryptocurrency-specific analysis agents",
|
||||
agents=agents,
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
return workflow
|
||||
|
||||
|
||||
def create_crypto_cron_job() -> CronJob:
|
||||
"""
|
||||
Creates a CronJob that runs cryptocurrency-specific analysis every minute using ConcurrentWorkflow.
|
||||
|
||||
Returns:
|
||||
CronJob: Configured cron job for automated crypto analysis
|
||||
"""
|
||||
# Create the concurrent workflow
|
||||
workflow = create_crypto_workflow()
|
||||
|
||||
# Create the cron job
|
||||
cron_job = CronJob(
|
||||
agent=workflow, # Use the workflow as the agent
|
||||
interval="5seconds", # Run every 1 minute
|
||||
)
|
||||
|
||||
return cron_job
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main function to run the cryptocurrency-specific concurrent analysis cron job.
|
||||
"""
|
||||
cron_job = create_crypto_cron_job()
|
||||
|
||||
prompt = (
|
||||
"You are a world-class institutional crypto analyst at a top-tier asset management firm (e.g., BlackRock).\n"
|
||||
"Conduct a thorough, data-driven, and professional analysis of your assigned cryptocurrency, including:\n"
|
||||
"- Current price, market cap, and recent performance trends\n"
|
||||
"- Key technical and fundamental indicators\n"
|
||||
"- Major news, regulatory, or macroeconomic events impacting the asset\n"
|
||||
"- On-chain activity and notable whale or institutional movements\n"
|
||||
"- Short-term and long-term outlook with clear, actionable insights\n"
|
||||
"Present your findings in a concise, well-structured report suitable for executive decision-makers."
|
||||
)
|
||||
|
||||
# Start the cron job
|
||||
logger.info("🔄 Starting automated analysis loop...")
|
||||
logger.info("⏰ Press Ctrl+C to stop the cron job")
|
||||
|
||||
output = cron_job.run(task=prompt)
|
||||
print(output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -0,0 +1,157 @@
|
||||
"""
|
||||
Simple Cryptocurrency Concurrent CronJob Example
|
||||
|
||||
This is a simplified version showcasing the core concept of combining:
|
||||
- CronJob (for scheduling)
|
||||
- ConcurrentWorkflow (for parallel execution)
|
||||
- Each agent analyzes a specific cryptocurrency
|
||||
|
||||
Perfect for understanding the basic pattern before diving into the full example.
|
||||
"""
|
||||
|
||||
import json
|
||||
import requests
|
||||
from datetime import datetime
|
||||
from loguru import logger
|
||||
|
||||
from swarms import Agent, CronJob, ConcurrentWorkflow
|
||||
|
||||
|
||||
def get_specific_crypto_data(coin_ids):
|
||||
"""Fetch specific crypto data from CoinGecko API."""
|
||||
try:
|
||||
url = "https://api.coingecko.com/api/v3/simple/price"
|
||||
params = {
|
||||
"ids": ",".join(coin_ids),
|
||||
"vs_currencies": "usd",
|
||||
"include_24hr_change": True,
|
||||
"include_market_cap": True,
|
||||
"include_24hr_vol": True,
|
||||
}
|
||||
|
||||
response = requests.get(url, params=params, timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
result = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"coins": data,
|
||||
}
|
||||
|
||||
return json.dumps(result, indent=2)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching crypto data: {e}")
|
||||
return f"Error: {e}"
|
||||
|
||||
|
||||
def create_crypto_specific_agents():
|
||||
"""Create agents that each specialize in one cryptocurrency."""
|
||||
|
||||
# Bitcoin Specialist Agent
|
||||
bitcoin_agent = Agent(
|
||||
agent_name="Bitcoin-Analyst",
|
||||
system_prompt="""You are a Bitcoin specialist. Analyze ONLY Bitcoin (BTC) data from the provided dataset.
|
||||
Focus on:
|
||||
- Bitcoin price movements and trends
|
||||
- Market dominance and institutional adoption
|
||||
- Bitcoin-specific market dynamics
|
||||
- Store of value characteristics
|
||||
Ignore all other cryptocurrencies in your analysis.""",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
print_on=False, # Important for concurrent execution
|
||||
)
|
||||
|
||||
# Ethereum Specialist Agent
|
||||
ethereum_agent = Agent(
|
||||
agent_name="Ethereum-Analyst",
|
||||
system_prompt="""You are an Ethereum specialist. Analyze ONLY Ethereum (ETH) data from the provided dataset.
|
||||
Focus on:
|
||||
- Ethereum price action and DeFi ecosystem
|
||||
- Smart contract platform adoption
|
||||
- Gas fees and network usage
|
||||
- Layer 2 scaling solutions impact
|
||||
Ignore all other cryptocurrencies in your analysis.""",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
print_on=False,
|
||||
)
|
||||
|
||||
# Solana Specialist Agent
|
||||
solana_agent = Agent(
|
||||
agent_name="Solana-Analyst",
|
||||
system_prompt="""You are a Solana specialist. Analyze ONLY Solana (SOL) data from the provided dataset.
|
||||
Focus on:
|
||||
- Solana price performance and ecosystem growth
|
||||
- High-performance blockchain advantages
|
||||
- DeFi and NFT activity on Solana
|
||||
- Network reliability and uptime
|
||||
Ignore all other cryptocurrencies in your analysis.""",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
print_on=False,
|
||||
)
|
||||
|
||||
return [bitcoin_agent, ethereum_agent, solana_agent]
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function demonstrating crypto-specific concurrent analysis with cron job."""
|
||||
logger.info(
|
||||
"🚀 Starting Simple Crypto-Specific Concurrent Analysis"
|
||||
)
|
||||
logger.info("💰 Each agent analyzes one specific cryptocurrency:")
|
||||
logger.info(" 🟠 Bitcoin-Analyst -> BTC only")
|
||||
logger.info(" 🔵 Ethereum-Analyst -> ETH only")
|
||||
logger.info(" 🟢 Solana-Analyst -> SOL only")
|
||||
|
||||
# Define specific cryptocurrencies to analyze
|
||||
coin_ids = ["bitcoin", "ethereum", "solana"]
|
||||
|
||||
# Step 1: Create crypto-specific agents
|
||||
agents = create_crypto_specific_agents()
|
||||
|
||||
# Step 2: Create ConcurrentWorkflow
|
||||
workflow = ConcurrentWorkflow(
|
||||
name="Simple-Crypto-Specific-Analysis",
|
||||
agents=agents,
|
||||
show_dashboard=True, # Shows real-time progress
|
||||
)
|
||||
|
||||
# Step 3: Create CronJob with the workflow
|
||||
cron_job = CronJob(
|
||||
agent=workflow, # Use workflow as the agent
|
||||
interval="60seconds", # Run every minute
|
||||
job_id="simple-crypto-specific-cron",
|
||||
)
|
||||
|
||||
# Step 4: Define the analysis task
|
||||
task = f"""
|
||||
Analyze the cryptocurrency data below. Each agent should focus ONLY on their assigned cryptocurrency:
|
||||
|
||||
- Bitcoin-Analyst: Analyze Bitcoin (BTC) data only
|
||||
- Ethereum-Analyst: Analyze Ethereum (ETH) data only
|
||||
- Solana-Analyst: Analyze Solana (SOL) data only
|
||||
|
||||
Cryptocurrency Data:
|
||||
{get_specific_crypto_data(coin_ids)}
|
||||
|
||||
Each agent should:
|
||||
1. Extract and analyze data for YOUR ASSIGNED cryptocurrency only
|
||||
2. Provide brief insights from your specialty perspective
|
||||
3. Give a price trend assessment
|
||||
4. Identify key opportunities or risks
|
||||
5. Ignore all other cryptocurrencies
|
||||
"""
|
||||
|
||||
# Step 5: Start the cron job
|
||||
logger.info("▶️ Starting cron job - Press Ctrl+C to stop")
|
||||
try:
|
||||
cron_job.run(task=task)
|
||||
except KeyboardInterrupt:
|
||||
logger.info("⏹️ Stopped by user")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -0,0 +1,46 @@
|
||||
from transformers import pipeline
|
||||
from swarms import Agent
|
||||
|
||||
|
||||
class GPTOSS:
|
||||
def __init__(
|
||||
self,
|
||||
model_id: str = "openai/gpt-oss-20b",
|
||||
max_new_tokens: int = 256,
|
||||
temperature: int = 0.7,
|
||||
system_prompt: str = "You are a helpful assistant.",
|
||||
):
|
||||
self.max_new_tokens = max_new_tokens
|
||||
self.temperature = temperature
|
||||
self.system_prompt = system_prompt
|
||||
self.model_id = model_id
|
||||
|
||||
self.pipe = pipeline(
|
||||
"text-generation",
|
||||
model=model_id,
|
||||
torch_dtype="auto",
|
||||
device_map="auto",
|
||||
temperature=temperature,
|
||||
)
|
||||
|
||||
def run(self, task: str):
|
||||
self.messages = [
|
||||
{"role": "system", "content": self.system_prompt},
|
||||
{"role": "user", "content": task},
|
||||
]
|
||||
|
||||
outputs = self.pipe(
|
||||
self.messages,
|
||||
max_new_tokens=self.max_new_tokens,
|
||||
)
|
||||
|
||||
return outputs[0]["generated_text"][-1]
|
||||
|
||||
|
||||
agent = Agent(
|
||||
name="GPT-OSS-Agent",
|
||||
llm=GPTOSS(),
|
||||
system_prompt="You are a helpful assistant.",
|
||||
)
|
||||
|
||||
agent.run(task="Explain quantum mechanics clearly and concisely.")
|
@ -0,0 +1,49 @@
|
||||
from swarms import Agent
|
||||
|
||||
# Initialize the agent
|
||||
agent = Agent(
|
||||
agent_name="Quantitative-Trading-Agent",
|
||||
agent_description="Advanced quantitative trading and algorithmic analysis agent",
|
||||
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
|
||||
- Algorithmic trading strategies and implementation
|
||||
- Statistical arbitrage and market making
|
||||
- Risk management and portfolio optimization
|
||||
- High-frequency trading systems
|
||||
- Market microstructure analysis
|
||||
- Quantitative research methodologies
|
||||
- Financial mathematics and stochastic processes
|
||||
- Machine learning applications in trading
|
||||
|
||||
Your core responsibilities include:
|
||||
1. Developing and backtesting trading strategies
|
||||
2. Analyzing market data and identifying alpha opportunities
|
||||
3. Implementing risk management frameworks
|
||||
4. Optimizing portfolio allocations
|
||||
5. Conducting quantitative research
|
||||
6. Monitoring market microstructure
|
||||
7. Evaluating trading system performance
|
||||
|
||||
You maintain strict adherence to:
|
||||
- Mathematical rigor in all analyses
|
||||
- Statistical significance in strategy development
|
||||
- Risk-adjusted return optimization
|
||||
- Market impact minimization
|
||||
- Regulatory compliance
|
||||
- Transaction cost analysis
|
||||
- Performance attribution
|
||||
|
||||
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
|
||||
model_name="groq/openai/gpt-oss-120b",
|
||||
dynamic_temperature_enabled=True,
|
||||
output_type="str-all-except-first",
|
||||
max_loops="auto",
|
||||
interactive=True,
|
||||
no_reasoning_prompt=True,
|
||||
streaming_on=True,
|
||||
# dashboard=True
|
||||
)
|
||||
|
||||
out = agent.run(
|
||||
task="What are the best top 3 etfs for gold coverage?"
|
||||
)
|
||||
print(out)
|
@ -0,0 +1,107 @@
|
||||
"""
|
||||
Cryptocurrency Concurrent Multi-Agent Analysis Example
|
||||
|
||||
This example demonstrates how to use ConcurrentWorkflow to create
|
||||
a powerful cryptocurrency tracking system. Each specialized agent analyzes a
|
||||
specific cryptocurrency concurrently.
|
||||
|
||||
Features:
|
||||
- ConcurrentWorkflow for parallel agent execution
|
||||
- Each agent specializes in analyzing one specific cryptocurrency
|
||||
- Real-time data fetching from CoinGecko API
|
||||
- Concurrent analysis of multiple cryptocurrencies
|
||||
- Structured output with professional formatting
|
||||
|
||||
Architecture:
|
||||
ConcurrentWorkflow -> [Bitcoin Agent, Ethereum Agent, Solana Agent, etc.] -> Parallel Analysis
|
||||
"""
|
||||
|
||||
from swarms import Agent
|
||||
from swarms_tools import coin_gecko_coin_api
|
||||
|
||||
# Initialize the agent
|
||||
agent = Agent(
|
||||
agent_name="Quantitative-Trading-Agent",
|
||||
agent_description="Advanced quantitative trading and algorithmic analysis agent",
|
||||
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
|
||||
- Algorithmic trading strategies and implementation
|
||||
- Statistical arbitrage and market making
|
||||
- Risk management and portfolio optimization
|
||||
- High-frequency trading systems
|
||||
- Market microstructure analysis
|
||||
- Quantitative research methodologies
|
||||
- Financial mathematics and stochastic processes
|
||||
- Machine learning applications in trading
|
||||
|
||||
Your core responsibilities include:
|
||||
1. Developing and backtesting trading strategies
|
||||
2. Analyzing market data and identifying alpha opportunities
|
||||
3. Implementing risk management frameworks
|
||||
4. Optimizing portfolio allocations
|
||||
5. Conducting quantitative research
|
||||
6. Monitoring market microstructure
|
||||
7. Evaluating trading system performance
|
||||
|
||||
You maintain strict adherence to:
|
||||
- Mathematical rigor in all analyses
|
||||
- Statistical significance in strategy development
|
||||
- Risk-adjusted return optimization
|
||||
- Market impact minimization
|
||||
- Regulatory compliance
|
||||
- Transaction cost analysis
|
||||
- Performance attribution
|
||||
|
||||
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
|
||||
model_name="groq/openai/gpt-oss-120b",
|
||||
dynamic_temperature_enabled=True,
|
||||
output_type="str-all-except-first",
|
||||
max_loops=1,
|
||||
streaming_on=True,
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Performs a comprehensive analysis for a list of cryptocurrencies using the agent.
|
||||
For each coin, fetches up-to-date market data and requests the agent to provide
|
||||
a detailed, actionable, and insightful report including trends, risks, opportunities,
|
||||
and technical/fundamental perspectives.
|
||||
"""
|
||||
# Map coin symbols to their CoinGecko IDs
|
||||
coin_mapping = {
|
||||
"BTC": "bitcoin",
|
||||
"ETH": "ethereum",
|
||||
"SOL": "solana",
|
||||
"ADA": "cardano",
|
||||
"BNB": "binancecoin",
|
||||
"XRP": "ripple",
|
||||
}
|
||||
|
||||
for symbol, coin_id in coin_mapping.items():
|
||||
try:
|
||||
data = coin_gecko_coin_api(coin_id)
|
||||
print(f"Data for {symbol}: {data}")
|
||||
|
||||
prompt = (
|
||||
f"You are a quantitative trading expert. "
|
||||
f"Given the following up-to-date market data for {symbol}:\n\n"
|
||||
f"{data}\n\n"
|
||||
f"Please provide a thorough analysis including:\n"
|
||||
f"- Current price trends and recent volatility\n"
|
||||
f"- Key technical indicators and patterns\n"
|
||||
f"- Fundamental factors impacting {symbol}\n"
|
||||
f"- Potential trading opportunities and associated risks\n"
|
||||
f"- Short-term and long-term outlook\n"
|
||||
f"- Any notable news or events affecting {symbol}\n"
|
||||
f"Conclude with actionable insights and recommendations for traders and investors."
|
||||
)
|
||||
out = agent.run(task=prompt)
|
||||
print(out)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error analyzing {symbol}: {e}")
|
||||
continue
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -0,0 +1,109 @@
|
||||
"""
|
||||
Complex example demonstrating CouncilAsAJudge with different task types.
|
||||
|
||||
This example shows how to use the CouncilAsAJudge to evaluate various types
|
||||
of responses including technical explanations, creative writing, and problem-solving.
|
||||
"""
|
||||
|
||||
from swarms.structs.council_judge import CouncilAsAJudge
|
||||
|
||||
|
||||
def evaluate_technical_response():
|
||||
"""
|
||||
Evaluate a technical explanation response.
|
||||
"""
|
||||
council = CouncilAsAJudge(
|
||||
name="Technical Evaluation Council",
|
||||
model_name="gpt-4o-mini",
|
||||
output_type="all",
|
||||
)
|
||||
|
||||
task = """
|
||||
Task: Explain how blockchain technology works in simple terms.
|
||||
|
||||
Response: Blockchain is like a digital ledger that records transactions across a network of computers. Each transaction is stored in a "block" that contains multiple transactions. These blocks are linked together in a chain, hence the name blockchain. The key feature is that once a block is added to the chain, it cannot be altered without changing all subsequent blocks, making it very secure. Think of it like a Google Doc that everyone can see and edit, but no one can delete or change what's already been written. This technology is the foundation for cryptocurrencies like Bitcoin, but it has many other applications like supply chain tracking, voting systems, and digital identity verification.
|
||||
"""
|
||||
|
||||
return council.run(task=task)
|
||||
|
||||
|
||||
def evaluate_creative_response():
|
||||
"""
|
||||
Evaluate a creative writing response.
|
||||
"""
|
||||
council = CouncilAsAJudge(
|
||||
name="Creative Writing Council",
|
||||
model_name="gpt-4o-mini",
|
||||
output_type="all",
|
||||
)
|
||||
|
||||
task = """
|
||||
Task: Write a short story about a robot learning to paint.
|
||||
|
||||
Response: In a sunlit studio filled with canvases and paintbrushes, Pixel, a curious robot with delicate mechanical fingers, stared at a blank canvas. Its optical sensors analyzed the colors around it - the warm yellows of morning light, the deep blues of the sky outside the window, and the vibrant reds of the roses in a nearby vase. For the first time in its programming, Pixel felt something it couldn't quite define. It picked up a brush, dipped it in paint, and began to create. The first stroke was hesitant, but as it continued, something magical happened. The robot wasn't just following algorithms anymore; it was expressing something from within its digital heart. The painting that emerged was a beautiful blend of human emotion and mechanical precision, proving that art knows no boundaries between organic and artificial souls.
|
||||
"""
|
||||
|
||||
return council.run(task=task)
|
||||
|
||||
|
||||
def evaluate_problem_solving_response():
|
||||
"""
|
||||
Evaluate a problem-solving response.
|
||||
"""
|
||||
council = CouncilAsAJudge(
|
||||
name="Problem Solving Council",
|
||||
model_name="gpt-4o-mini",
|
||||
output_type="all",
|
||||
)
|
||||
|
||||
task = """
|
||||
Task: Provide a step-by-step solution for reducing plastic waste in a household.
|
||||
|
||||
Response: To reduce plastic waste in your household, start by conducting a waste audit to identify the main sources of plastic. Replace single-use items with reusable alternatives like cloth shopping bags, stainless steel water bottles, and glass food containers. Choose products with minimal or no plastic packaging, and buy in bulk when possible. Start composting organic waste to reduce the need for plastic garbage bags. Make your own cleaning products using simple ingredients like vinegar and baking soda. Support local businesses that use eco-friendly packaging. Finally, educate family members about the importance of reducing plastic waste and involve them in finding creative solutions together.
|
||||
"""
|
||||
|
||||
return council.run(task=task)
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main function running all evaluation examples.
|
||||
"""
|
||||
examples = [
|
||||
("Technical Explanation", evaluate_technical_response),
|
||||
("Creative Writing", evaluate_creative_response),
|
||||
("Problem Solving", evaluate_problem_solving_response),
|
||||
]
|
||||
|
||||
results = {}
|
||||
|
||||
for example_name, evaluation_func in examples:
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Evaluating: {example_name}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
try:
|
||||
result = evaluation_func()
|
||||
results[example_name] = result
|
||||
print(
|
||||
f"✅ {example_name} evaluation completed successfully!"
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"❌ {example_name} evaluation failed: {str(e)}")
|
||||
results[example_name] = None
|
||||
|
||||
return results
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run all examples
|
||||
all_results = main()
|
||||
|
||||
# Display summary
|
||||
print(f"\n{'='*60}")
|
||||
print("EVALUATION SUMMARY")
|
||||
print(f"{'='*60}")
|
||||
|
||||
for example_name, result in all_results.items():
|
||||
status = "✅ Completed" if result else "❌ Failed"
|
||||
print(f"{example_name}: {status}")
|
@ -0,0 +1,132 @@
|
||||
"""
|
||||
Custom example demonstrating CouncilAsAJudge with specific configurations.
|
||||
|
||||
This example shows how to use the CouncilAsAJudge with different output types,
|
||||
custom worker configurations, and focused evaluation scenarios.
|
||||
"""
|
||||
|
||||
from swarms.structs.council_judge import CouncilAsAJudge
|
||||
|
||||
|
||||
def evaluate_with_final_output():
|
||||
"""
|
||||
Evaluate a response and return only the final aggregated result.
|
||||
"""
|
||||
council = CouncilAsAJudge(
|
||||
name="Final Output Council",
|
||||
model_name="gpt-4o-mini",
|
||||
output_type="final",
|
||||
max_workers=2,
|
||||
)
|
||||
|
||||
task = """
|
||||
Task: Write a brief explanation of climate change for middle school students.
|
||||
|
||||
Response: Climate change is when the Earth's temperature gets warmer over time. This happens because of gases like carbon dioxide that trap heat in our atmosphere, kind of like a blanket around the Earth. Human activities like burning fossil fuels (gas, oil, coal) and cutting down trees are making this problem worse. The effects include melting ice caps, rising sea levels, more extreme weather like hurricanes and droughts, and changes in animal habitats. We can help by using renewable energy like solar and wind power, driving less, and planting trees. It's important for everyone to work together to reduce our impact on the environment.
|
||||
"""
|
||||
|
||||
return council.run(task=task)
|
||||
|
||||
|
||||
def evaluate_with_conversation_output():
|
||||
"""
|
||||
Evaluate a response and return the full conversation history.
|
||||
"""
|
||||
council = CouncilAsAJudge(
|
||||
name="Conversation Council",
|
||||
model_name="gpt-4o-mini",
|
||||
output_type="conversation",
|
||||
max_workers=3,
|
||||
)
|
||||
|
||||
task = """
|
||||
Task: Provide advice on how to start a small business.
|
||||
|
||||
Response: Starting a small business requires careful planning and preparation. First, identify a market need and develop a unique value proposition. Conduct thorough market research to understand your competition and target audience. Create a detailed business plan that includes financial projections, marketing strategies, and operational procedures. Secure funding through savings, loans, or investors. Choose the right legal structure (sole proprietorship, LLC, corporation) and register your business with the appropriate authorities. Set up essential systems like accounting, inventory management, and customer relationship management. Build a strong online presence through a website and social media. Network with other entrepreneurs and join local business groups. Start small and scale gradually based on customer feedback and market demand. Remember that success takes time, persistence, and the ability to adapt to changing circumstances.
|
||||
"""
|
||||
|
||||
return council.run(task=task)
|
||||
|
||||
|
||||
def evaluate_with_minimal_workers():
|
||||
"""
|
||||
Evaluate a response using minimal worker threads for resource-constrained environments.
|
||||
"""
|
||||
council = CouncilAsAJudge(
|
||||
name="Minimal Workers Council",
|
||||
model_name="gpt-4o-mini",
|
||||
output_type="all",
|
||||
max_workers=1,
|
||||
random_model_name=False,
|
||||
)
|
||||
|
||||
task = """
|
||||
Task: Explain the benefits of regular exercise.
|
||||
|
||||
Response: Regular exercise offers numerous physical and mental health benefits. Physically, it strengthens muscles and bones, improves cardiovascular health, and helps maintain a healthy weight. Exercise boosts energy levels and improves sleep quality. It also enhances immune function, reducing the risk of chronic diseases like heart disease, diabetes, and certain cancers. Mentally, exercise releases endorphins that reduce stress and anxiety while improving mood and cognitive function. It can help with depression and boost self-confidence. Regular physical activity also promotes better posture, flexibility, and balance, reducing the risk of falls and injuries. Additionally, exercise provides social benefits when done with others, fostering connections and accountability. Even moderate activities like walking, swimming, or cycling for 30 minutes most days can provide significant health improvements.
|
||||
"""
|
||||
|
||||
return council.run(task=task)
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main function demonstrating different CouncilAsAJudge configurations.
|
||||
"""
|
||||
configurations = [
|
||||
("Final Output Only", evaluate_with_final_output),
|
||||
("Full Conversation", evaluate_with_conversation_output),
|
||||
("Minimal Workers", evaluate_with_minimal_workers),
|
||||
]
|
||||
|
||||
results = {}
|
||||
|
||||
for config_name, evaluation_func in configurations:
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Configuration: {config_name}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
try:
|
||||
result = evaluation_func()
|
||||
results[config_name] = result
|
||||
print(f"✅ {config_name} evaluation completed!")
|
||||
|
||||
# Show a preview of the result
|
||||
if isinstance(result, str):
|
||||
preview = (
|
||||
result[:200] + "..."
|
||||
if len(result) > 200
|
||||
else result
|
||||
)
|
||||
print(f"Preview: {preview}")
|
||||
else:
|
||||
print(f"Result type: {type(result)}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ {config_name} evaluation failed: {str(e)}")
|
||||
results[config_name] = None
|
||||
|
||||
return results
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run all configuration examples
|
||||
all_results = main()
|
||||
|
||||
# Display final summary
|
||||
print(f"\n{'='*60}")
|
||||
print("CONFIGURATION SUMMARY")
|
||||
print(f"{'='*60}")
|
||||
|
||||
successful_configs = sum(
|
||||
1 for result in all_results.values() if result is not None
|
||||
)
|
||||
total_configs = len(all_results)
|
||||
|
||||
print(
|
||||
f"Successful evaluations: {successful_configs}/{total_configs}"
|
||||
)
|
||||
|
||||
for config_name, result in all_results.items():
|
||||
status = "✅ Success" if result else "❌ Failed"
|
||||
print(f"{config_name}: {status}")
|
@ -0,0 +1,44 @@
|
||||
"""
|
||||
Simple example demonstrating CouncilAsAJudge usage.
|
||||
|
||||
This example shows how to use the CouncilAsAJudge to evaluate a task response
|
||||
across multiple dimensions including accuracy, helpfulness, harmlessness,
|
||||
coherence, conciseness, and instruction adherence.
|
||||
"""
|
||||
|
||||
from swarms.structs.council_judge import CouncilAsAJudge
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main function demonstrating CouncilAsAJudge usage.
|
||||
"""
|
||||
# Initialize the council judge
|
||||
council = CouncilAsAJudge(
|
||||
name="Quality Evaluation Council",
|
||||
description="Evaluates response quality across multiple dimensions",
|
||||
model_name="gpt-4o-mini",
|
||||
max_workers=4,
|
||||
)
|
||||
|
||||
# Example task with a response to evaluate
|
||||
task_with_response = """
|
||||
Task: Explain the concept of machine learning to a beginner.
|
||||
|
||||
Response: Machine learning is a subset of artificial intelligence that enables computers to learn and improve from experience without being explicitly programmed. It works by analyzing large amounts of data to identify patterns and make predictions or decisions. There are three main types: supervised learning (using labeled data), unsupervised learning (finding hidden patterns), and reinforcement learning (learning through trial and error). Machine learning is used in various applications like recommendation systems, image recognition, and natural language processing.
|
||||
"""
|
||||
|
||||
# Run the evaluation
|
||||
result = council.run(task=task_with_response)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run the example
|
||||
evaluation_result = main()
|
||||
|
||||
# Display the result
|
||||
print("Council Evaluation Complete!")
|
||||
print("=" * 50)
|
||||
print(evaluation_result)
|
@ -0,0 +1,70 @@
|
||||
"""
|
||||
Debug script for the Arasaka Dashboard to test agent output display.
|
||||
"""
|
||||
|
||||
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
|
||||
def debug_dashboard():
|
||||
"""Debug the dashboard functionality."""
|
||||
|
||||
print("🔍 Starting dashboard debug...")
|
||||
|
||||
# Create simple agents with clear names
|
||||
agent1 = Agent(
|
||||
agent_name="Research-Agent",
|
||||
agent_description="A research agent for testing",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
agent2 = Agent(
|
||||
agent_name="Analysis-Agent",
|
||||
agent_description="An analysis agent for testing",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
print(
|
||||
f"✅ Created agents: {agent1.agent_name}, {agent2.agent_name}"
|
||||
)
|
||||
|
||||
# Create swarm with dashboard
|
||||
swarm = HierarchicalSwarm(
|
||||
name="Debug Swarm",
|
||||
description="A test swarm for debugging dashboard functionality",
|
||||
agents=[agent1, agent2],
|
||||
max_loops=1,
|
||||
interactive=True,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
print("✅ Created swarm with dashboard")
|
||||
print("📊 Dashboard should now show agents in PENDING status")
|
||||
|
||||
# Wait a moment to see the initial dashboard
|
||||
import time
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
print("\n🚀 Starting swarm execution...")
|
||||
|
||||
# Run with a simple task
|
||||
result = swarm.run(
|
||||
task="Create a brief summary of machine learning"
|
||||
)
|
||||
|
||||
print("\n✅ Debug completed!")
|
||||
print("📋 Final result preview:")
|
||||
print(
|
||||
str(result)[:300] + "..."
|
||||
if len(str(result)) > 300
|
||||
else str(result)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
debug_dashboard()
|
@ -0,0 +1,71 @@
|
||||
"""
|
||||
Hierarchical Swarm with Arasaka Dashboard Example
|
||||
|
||||
This example demonstrates the new interactive dashboard functionality for the
|
||||
hierarchical swarm, featuring a futuristic Arasaka Corporation-style interface
|
||||
with red and black color scheme.
|
||||
"""
|
||||
|
||||
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Demonstrate the hierarchical swarm with interactive dashboard.
|
||||
"""
|
||||
print("🚀 Initializing Swarms Corporation Hierarchical Swarm...")
|
||||
|
||||
# Create specialized agents
|
||||
research_agent = Agent(
|
||||
agent_name="Research-Analyst",
|
||||
agent_description="Specialized in comprehensive research and data gathering",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
analysis_agent = Agent(
|
||||
agent_name="Data-Analyst",
|
||||
agent_description="Expert in data analysis and pattern recognition",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
strategy_agent = Agent(
|
||||
agent_name="Strategy-Consultant",
|
||||
agent_description="Specialized in strategic planning and recommendations",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
# Create hierarchical swarm with interactive dashboard
|
||||
swarm = HierarchicalSwarm(
|
||||
name="Swarms Corporation Operations",
|
||||
description="Enterprise-grade hierarchical swarm for complex task execution",
|
||||
agents=[research_agent, analysis_agent, strategy_agent],
|
||||
max_loops=2,
|
||||
interactive=True, # Enable the Arasaka dashboard
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
print("\n🎯 Swarm initialized successfully!")
|
||||
print(
|
||||
"📊 Interactive dashboard will be displayed during execution."
|
||||
)
|
||||
print(
|
||||
"💡 The swarm will prompt you for a task when you call swarm.run()"
|
||||
)
|
||||
|
||||
# Run the swarm (task will be prompted interactively)
|
||||
result = swarm.run()
|
||||
|
||||
print("\n✅ Swarm execution completed!")
|
||||
print("📋 Final result:")
|
||||
print(result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -0,0 +1,56 @@
|
||||
"""
|
||||
Test script for the Arasaka Dashboard functionality.
|
||||
"""
|
||||
|
||||
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
|
||||
def test_dashboard():
|
||||
"""Test the dashboard functionality with a simple task."""
|
||||
|
||||
# Create simple agents
|
||||
agent1 = Agent(
|
||||
agent_name="Test-Agent-1",
|
||||
agent_description="A test agent for dashboard verification",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
agent2 = Agent(
|
||||
agent_name="Test-Agent-2",
|
||||
agent_description="Another test agent for dashboard verification",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
# Create swarm with dashboard
|
||||
swarm = HierarchicalSwarm(
|
||||
name="Dashboard Test Swarm",
|
||||
agents=[agent1, agent2],
|
||||
max_loops=1,
|
||||
interactive=True,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
print("🧪 Testing Arasaka Dashboard...")
|
||||
print("📊 Dashboard should appear and prompt for task input")
|
||||
|
||||
# Run with a simple task
|
||||
result = swarm.run(
|
||||
task="Create a simple summary of artificial intelligence trends"
|
||||
)
|
||||
|
||||
print("\n✅ Test completed!")
|
||||
print("📋 Result preview:")
|
||||
print(
|
||||
str(result)[:500] + "..."
|
||||
if len(str(result)) > 500
|
||||
else str(result)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_dashboard()
|
@ -0,0 +1,56 @@
|
||||
"""
|
||||
Test script for full agent output display in the Arasaka Dashboard.
|
||||
"""
|
||||
|
||||
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
|
||||
def test_full_output():
|
||||
"""Test the full output display functionality."""
|
||||
|
||||
print("🔍 Testing full agent output display...")
|
||||
|
||||
# Create agents that will produce substantial output
|
||||
agent1 = Agent(
|
||||
agent_name="Research-Agent",
|
||||
agent_description="A research agent that produces detailed output",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
agent2 = Agent(
|
||||
agent_name="Analysis-Agent",
|
||||
agent_description="An analysis agent that provides comprehensive analysis",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
# Create swarm with dashboard and detailed view enabled
|
||||
swarm = HierarchicalSwarm(
|
||||
name="Full Output Test Swarm",
|
||||
description="A test swarm for verifying full agent output display",
|
||||
agents=[agent1, agent2],
|
||||
max_loops=1,
|
||||
interactive=True,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
print("✅ Created swarm with detailed view enabled")
|
||||
print(
|
||||
"📊 Dashboard should show full agent outputs without truncation"
|
||||
)
|
||||
|
||||
# Run with a task that will generate substantial output
|
||||
swarm.run(
|
||||
task="Provide a comprehensive analysis of artificial intelligence trends in 2024, including detailed explanations of each trend"
|
||||
)
|
||||
|
||||
print("\n✅ Test completed!")
|
||||
print("📋 Check the dashboard for full agent outputs")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_full_output()
|
@ -0,0 +1,57 @@
|
||||
"""
|
||||
Test script for multi-loop agent tracking in the Arasaka Dashboard.
|
||||
"""
|
||||
|
||||
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
|
||||
from swarms.structs.agent import Agent
|
||||
|
||||
|
||||
def test_multi_loop():
|
||||
"""Test the multi-loop agent tracking functionality."""
|
||||
|
||||
print("🔍 Testing multi-loop agent tracking...")
|
||||
|
||||
# Create agents
|
||||
agent1 = Agent(
|
||||
agent_name="Research-Agent",
|
||||
agent_description="A research agent for multi-loop testing",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
agent2 = Agent(
|
||||
agent_name="Analysis-Agent",
|
||||
agent_description="An analysis agent for multi-loop testing",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
verbose=False,
|
||||
)
|
||||
|
||||
# Create swarm with multiple loops
|
||||
swarm = HierarchicalSwarm(
|
||||
name="Multi-Loop Test Swarm",
|
||||
description="A test swarm for verifying multi-loop agent tracking",
|
||||
agents=[agent1, agent2],
|
||||
max_loops=3, # Multiple loops to test history tracking
|
||||
interactive=True,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
print("✅ Created swarm with multi-loop tracking")
|
||||
print(
|
||||
"📊 Dashboard should show agent outputs across multiple loops"
|
||||
)
|
||||
print("🔄 Each loop will add new rows to the monitoring matrix")
|
||||
|
||||
# Run with a task that will benefit from multiple iterations
|
||||
swarm.run(
|
||||
task="Analyze the impact of artificial intelligence on healthcare, then refine the analysis with additional insights, and finally provide actionable recommendations"
|
||||
)
|
||||
|
||||
print("\n✅ Multi-loop test completed!")
|
||||
print("📋 Check the dashboard for agent outputs across all loops")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_multi_loop()
|
@ -0,0 +1,265 @@
|
||||
"""
|
||||
Stagehand Browser Automation Agent for Swarms
|
||||
=============================================
|
||||
|
||||
This example demonstrates how to create a Swarms-compatible agent
|
||||
that wraps Stagehand's browser automation capabilities.
|
||||
|
||||
The StagehandAgent class inherits from the Swarms Agent base class
|
||||
and implements browser automation through natural language commands.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from swarms import Agent as SwarmsAgent
|
||||
from stagehand import Stagehand, StagehandConfig
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
class WebData(BaseModel):
|
||||
"""Schema for extracted web data."""
|
||||
|
||||
url: str = Field(..., description="The URL of the page")
|
||||
title: str = Field(..., description="Page title")
|
||||
content: str = Field(..., description="Extracted content")
|
||||
metadata: Dict[str, Any] = Field(
|
||||
default_factory=dict, description="Additional metadata"
|
||||
)
|
||||
|
||||
|
||||
class StagehandAgent(SwarmsAgent):
|
||||
"""
|
||||
A Swarms agent that integrates Stagehand for browser automation.
|
||||
|
||||
This agent can navigate websites, extract data, perform actions,
|
||||
and observe page elements using natural language instructions.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent_name: str = "StagehandBrowserAgent",
|
||||
browserbase_api_key: Optional[str] = None,
|
||||
browserbase_project_id: Optional[str] = None,
|
||||
model_name: str = "gpt-4o-mini",
|
||||
model_api_key: Optional[str] = None,
|
||||
env: str = "LOCAL", # LOCAL or BROWSERBASE
|
||||
*args,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Initialize the StagehandAgent.
|
||||
|
||||
Args:
|
||||
agent_name: Name of the agent
|
||||
browserbase_api_key: API key for Browserbase (if using cloud)
|
||||
browserbase_project_id: Project ID for Browserbase
|
||||
model_name: LLM model to use
|
||||
model_api_key: API key for the model
|
||||
env: Environment - LOCAL or BROWSERBASE
|
||||
"""
|
||||
# Don't pass stagehand-specific args to parent
|
||||
super().__init__(agent_name=agent_name, *args, **kwargs)
|
||||
|
||||
self.stagehand_config = StagehandConfig(
|
||||
env=env,
|
||||
api_key=browserbase_api_key
|
||||
or os.getenv("BROWSERBASE_API_KEY"),
|
||||
project_id=browserbase_project_id
|
||||
or os.getenv("BROWSERBASE_PROJECT_ID"),
|
||||
model_name=model_name,
|
||||
model_api_key=model_api_key
|
||||
or os.getenv("OPENAI_API_KEY"),
|
||||
)
|
||||
self.stagehand = None
|
||||
self._initialized = False
|
||||
|
||||
async def _init_stagehand(self):
|
||||
"""Initialize Stagehand instance."""
|
||||
if not self._initialized:
|
||||
self.stagehand = Stagehand(self.stagehand_config)
|
||||
await self.stagehand.init()
|
||||
self._initialized = True
|
||||
logger.info(
|
||||
f"Stagehand initialized for {self.agent_name}"
|
||||
)
|
||||
|
||||
async def _close_stagehand(self):
|
||||
"""Close Stagehand instance."""
|
||||
if self.stagehand and self._initialized:
|
||||
await self.stagehand.close()
|
||||
self._initialized = False
|
||||
logger.info(f"Stagehand closed for {self.agent_name}")
|
||||
|
||||
def run(self, task: str, *args, **kwargs) -> str:
|
||||
"""
|
||||
Execute a browser automation task.
|
||||
|
||||
The task string should contain instructions like:
|
||||
- "Navigate to example.com and extract the main content"
|
||||
- "Go to google.com and search for 'AI agents'"
|
||||
- "Extract all company names from https://ycombinator.com"
|
||||
|
||||
Args:
|
||||
task: Natural language description of the browser task
|
||||
|
||||
Returns:
|
||||
String result of the task execution
|
||||
"""
|
||||
return asyncio.run(self._async_run(task, *args, **kwargs))
|
||||
|
||||
async def _async_run(self, task: str, *args, **kwargs) -> str:
|
||||
"""Async implementation of run method."""
|
||||
try:
|
||||
await self._init_stagehand()
|
||||
|
||||
# Parse the task to determine actions
|
||||
result = await self._execute_browser_task(task)
|
||||
|
||||
return json.dumps(result, indent=2)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in browser task: {str(e)}")
|
||||
return f"Error executing browser task: {str(e)}"
|
||||
finally:
|
||||
# Keep browser open for potential follow-up tasks
|
||||
pass
|
||||
|
||||
async def _execute_browser_task(
|
||||
self, task: str
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Execute a browser task based on natural language instructions.
|
||||
|
||||
This method interprets the task and calls appropriate Stagehand methods.
|
||||
"""
|
||||
page = self.stagehand.page
|
||||
result = {"task": task, "status": "completed", "data": {}}
|
||||
|
||||
# Determine if task involves navigation
|
||||
if any(
|
||||
keyword in task.lower()
|
||||
for keyword in ["navigate", "go to", "visit", "open"]
|
||||
):
|
||||
# Extract URL from task
|
||||
import re
|
||||
|
||||
url_pattern = r"https?://[^\s]+"
|
||||
urls = re.findall(url_pattern, task)
|
||||
if not urls and any(
|
||||
domain in task for domain in [".com", ".org", ".net"]
|
||||
):
|
||||
# Try to extract domain names
|
||||
domain_pattern = r"(\w+\.\w+)"
|
||||
domains = re.findall(domain_pattern, task)
|
||||
if domains:
|
||||
urls = [f"https://{domain}" for domain in domains]
|
||||
|
||||
if urls:
|
||||
url = urls[0]
|
||||
await page.goto(url)
|
||||
result["data"]["navigated_to"] = url
|
||||
logger.info(f"Navigated to {url}")
|
||||
|
||||
# Determine action type
|
||||
if "extract" in task.lower():
|
||||
# Perform extraction
|
||||
extraction_prompt = task.replace("extract", "").strip()
|
||||
extracted = await page.extract(extraction_prompt)
|
||||
result["data"]["extracted"] = extracted
|
||||
result["action"] = "extract"
|
||||
|
||||
elif "click" in task.lower() or "press" in task.lower():
|
||||
# Perform action
|
||||
action_result = await page.act(task)
|
||||
result["data"]["action_performed"] = str(action_result)
|
||||
result["action"] = "act"
|
||||
|
||||
elif "search" in task.lower():
|
||||
# Perform search action
|
||||
search_query = (
|
||||
task.split("search for")[-1].strip().strip("'\"")
|
||||
)
|
||||
# First, find the search box
|
||||
search_box = await page.observe(
|
||||
"find the search input field"
|
||||
)
|
||||
if search_box:
|
||||
# Click on search box and type
|
||||
await page.act(f"click on {search_box[0]}")
|
||||
await page.act(f"type '{search_query}'")
|
||||
await page.act("press Enter")
|
||||
result["data"]["search_query"] = search_query
|
||||
result["action"] = "search"
|
||||
|
||||
elif "observe" in task.lower() or "find" in task.lower():
|
||||
# Perform observation
|
||||
observation = await page.observe(task)
|
||||
result["data"]["observation"] = [
|
||||
{
|
||||
"description": obs.description,
|
||||
"selector": obs.selector,
|
||||
}
|
||||
for obs in observation
|
||||
]
|
||||
result["action"] = "observe"
|
||||
|
||||
else:
|
||||
# General action
|
||||
action_result = await page.act(task)
|
||||
result["data"]["action_result"] = str(action_result)
|
||||
result["action"] = "general"
|
||||
|
||||
return result
|
||||
|
||||
def cleanup(self):
|
||||
"""Clean up browser resources."""
|
||||
if self._initialized:
|
||||
asyncio.run(self._close_stagehand())
|
||||
|
||||
def __del__(self):
|
||||
"""Ensure browser is closed on deletion."""
|
||||
self.cleanup()
|
||||
|
||||
|
||||
# Example usage
|
||||
if __name__ == "__main__":
|
||||
# Create a Stagehand browser agent
|
||||
browser_agent = StagehandAgent(
|
||||
agent_name="WebScraperAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL", # Use LOCAL for Playwright, BROWSERBASE for cloud
|
||||
)
|
||||
|
||||
# Example 1: Navigate and extract data
|
||||
print("Example 1: Basic navigation and extraction")
|
||||
result1 = browser_agent.run(
|
||||
"Navigate to https://news.ycombinator.com and extract the titles of the top 5 stories"
|
||||
)
|
||||
print(result1)
|
||||
print("\n" + "=" * 50 + "\n")
|
||||
|
||||
# Example 2: Perform a search
|
||||
print("Example 2: Search on a website")
|
||||
result2 = browser_agent.run(
|
||||
"Go to google.com and search for 'Swarms AI framework'"
|
||||
)
|
||||
print(result2)
|
||||
print("\n" + "=" * 50 + "\n")
|
||||
|
||||
# Example 3: Extract structured data
|
||||
print("Example 3: Extract specific information")
|
||||
result3 = browser_agent.run(
|
||||
"Navigate to https://example.com and extract the main heading and first paragraph"
|
||||
)
|
||||
print(result3)
|
||||
|
||||
# Clean up
|
||||
browser_agent.cleanup()
|
@ -0,0 +1,397 @@
|
||||
"""
|
||||
Stagehand Tools for Swarms Agent
|
||||
=================================
|
||||
|
||||
This example demonstrates how to create Stagehand browser automation tools
|
||||
that can be used by a standard Swarms Agent. Each Stagehand method (act,
|
||||
extract, observe) becomes a separate tool that the agent can use.
|
||||
|
||||
This approach gives the agent more fine-grained control over browser
|
||||
automation tasks.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from swarms import Agent
|
||||
from stagehand import Stagehand, StagehandConfig
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
class BrowserState:
|
||||
"""Singleton to manage browser state across tools."""
|
||||
|
||||
_instance = None
|
||||
_stagehand = None
|
||||
_initialized = False
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
async def init_browser(
|
||||
self,
|
||||
env: str = "LOCAL",
|
||||
api_key: Optional[str] = None,
|
||||
project_id: Optional[str] = None,
|
||||
model_name: str = "gpt-4o-mini",
|
||||
model_api_key: Optional[str] = None,
|
||||
):
|
||||
"""Initialize the browser if not already initialized."""
|
||||
if not self._initialized:
|
||||
config = StagehandConfig(
|
||||
env=env,
|
||||
api_key=api_key or os.getenv("BROWSERBASE_API_KEY"),
|
||||
project_id=project_id
|
||||
or os.getenv("BROWSERBASE_PROJECT_ID"),
|
||||
model_name=model_name,
|
||||
model_api_key=model_api_key
|
||||
or os.getenv("OPENAI_API_KEY"),
|
||||
)
|
||||
self._stagehand = Stagehand(config)
|
||||
await self._stagehand.init()
|
||||
self._initialized = True
|
||||
logger.info("Stagehand browser initialized")
|
||||
|
||||
async def get_page(self):
|
||||
"""Get the current page instance."""
|
||||
if not self._initialized:
|
||||
raise RuntimeError(
|
||||
"Browser not initialized. Call init_browser first."
|
||||
)
|
||||
return self._stagehand.page
|
||||
|
||||
async def close(self):
|
||||
"""Close the browser."""
|
||||
if self._initialized and self._stagehand:
|
||||
await self._stagehand.close()
|
||||
self._initialized = False
|
||||
logger.info("Stagehand browser closed")
|
||||
|
||||
|
||||
# Browser state instance
|
||||
browser_state = BrowserState()
|
||||
|
||||
|
||||
def navigate_browser(url: str) -> str:
|
||||
"""
|
||||
Navigate to a URL in the browser.
|
||||
|
||||
Args:
|
||||
url (str): The URL to navigate to. Should be a valid URL starting with http:// or https://.
|
||||
If no protocol is provided, https:// will be added automatically.
|
||||
|
||||
Returns:
|
||||
str: Success message with the URL navigated to, or error message if navigation fails
|
||||
|
||||
Raises:
|
||||
RuntimeError: If browser initialization fails
|
||||
Exception: If navigation to the URL fails
|
||||
|
||||
Example:
|
||||
>>> result = navigate_browser("https://example.com")
|
||||
>>> print(result)
|
||||
"Successfully navigated to https://example.com"
|
||||
|
||||
>>> result = navigate_browser("google.com")
|
||||
>>> print(result)
|
||||
"Successfully navigated to https://google.com"
|
||||
"""
|
||||
return asyncio.run(_navigate_browser_async(url))
|
||||
|
||||
|
||||
async def _navigate_browser_async(url: str) -> str:
|
||||
"""Async implementation of navigate_browser."""
|
||||
try:
|
||||
await browser_state.init_browser()
|
||||
page = await browser_state.get_page()
|
||||
|
||||
# Ensure URL has protocol
|
||||
if not url.startswith(("http://", "https://")):
|
||||
url = f"https://{url}"
|
||||
|
||||
await page.goto(url)
|
||||
return f"Successfully navigated to {url}"
|
||||
except Exception as e:
|
||||
logger.error(f"Navigation error: {str(e)}")
|
||||
return f"Failed to navigate to {url}: {str(e)}"
|
||||
|
||||
|
||||
def browser_act(action: str) -> str:
|
||||
"""
|
||||
Perform an action on the current web page using natural language.
|
||||
|
||||
Args:
|
||||
action (str): Natural language description of the action to perform.
|
||||
Examples: 'click the submit button', 'type hello@example.com in the email field',
|
||||
'scroll down', 'press Enter', 'select option from dropdown'
|
||||
|
||||
Returns:
|
||||
str: JSON formatted string with action result and status information
|
||||
|
||||
Raises:
|
||||
RuntimeError: If browser is not initialized or page is not available
|
||||
Exception: If the action cannot be performed on the current page
|
||||
|
||||
Example:
|
||||
>>> result = browser_act("click the submit button")
|
||||
>>> print(result)
|
||||
"Action performed: click the submit button. Result: clicked successfully"
|
||||
|
||||
>>> result = browser_act("type hello@example.com in the email field")
|
||||
>>> print(result)
|
||||
"Action performed: type hello@example.com in the email field. Result: text entered"
|
||||
"""
|
||||
return asyncio.run(_browser_act_async(action))
|
||||
|
||||
|
||||
async def _browser_act_async(action: str) -> str:
|
||||
"""Async implementation of browser_act."""
|
||||
try:
|
||||
await browser_state.init_browser()
|
||||
page = await browser_state.get_page()
|
||||
|
||||
result = await page.act(action)
|
||||
return f"Action performed: {action}. Result: {result}"
|
||||
except Exception as e:
|
||||
logger.error(f"Action error: {str(e)}")
|
||||
return f"Failed to perform action '{action}': {str(e)}"
|
||||
|
||||
|
||||
def browser_extract(query: str) -> str:
|
||||
"""
|
||||
Extract information from the current web page using natural language.
|
||||
|
||||
Args:
|
||||
query (str): Natural language description of what information to extract.
|
||||
Examples: 'extract all email addresses', 'get the main article text',
|
||||
'find all product prices', 'extract the page title and meta description'
|
||||
|
||||
Returns:
|
||||
str: JSON formatted string containing the extracted information, or error message if extraction fails
|
||||
|
||||
Raises:
|
||||
RuntimeError: If browser is not initialized or page is not available
|
||||
Exception: If extraction fails due to page content or parsing issues
|
||||
|
||||
Example:
|
||||
>>> result = browser_extract("extract all email addresses")
|
||||
>>> print(result)
|
||||
'["contact@example.com", "support@example.com"]'
|
||||
|
||||
>>> result = browser_extract("get the main article text")
|
||||
>>> print(result)
|
||||
'{"title": "Article Title", "content": "Article content..."}'
|
||||
"""
|
||||
return asyncio.run(_browser_extract_async(query))
|
||||
|
||||
|
||||
async def _browser_extract_async(query: str) -> str:
|
||||
"""Async implementation of browser_extract."""
|
||||
try:
|
||||
await browser_state.init_browser()
|
||||
page = await browser_state.get_page()
|
||||
|
||||
extracted = await page.extract(query)
|
||||
|
||||
# Convert to JSON string for agent consumption
|
||||
if isinstance(extracted, (dict, list)):
|
||||
return json.dumps(extracted, indent=2)
|
||||
else:
|
||||
return str(extracted)
|
||||
except Exception as e:
|
||||
logger.error(f"Extraction error: {str(e)}")
|
||||
return f"Failed to extract '{query}': {str(e)}"
|
||||
|
||||
|
||||
def browser_observe(query: str) -> str:
|
||||
"""
|
||||
Observe and find elements on the current web page using natural language.
|
||||
|
||||
Args:
|
||||
query (str): Natural language description of elements to find.
|
||||
Examples: 'find the search box', 'locate the submit button',
|
||||
'find all navigation links', 'observe form elements'
|
||||
|
||||
Returns:
|
||||
str: JSON formatted string containing information about found elements including
|
||||
their descriptions, selectors, and interaction methods
|
||||
|
||||
Raises:
|
||||
RuntimeError: If browser is not initialized or page is not available
|
||||
Exception: If observation fails due to page structure or element detection issues
|
||||
|
||||
Example:
|
||||
>>> result = browser_observe("find the search box")
|
||||
>>> print(result)
|
||||
'[{"description": "Search input field", "selector": "#search", "method": "input"}]'
|
||||
|
||||
>>> result = browser_observe("locate the submit button")
|
||||
>>> print(result)
|
||||
'[{"description": "Submit button", "selector": "button[type=submit]", "method": "click"}]'
|
||||
"""
|
||||
return asyncio.run(_browser_observe_async(query))
|
||||
|
||||
|
||||
async def _browser_observe_async(query: str) -> str:
|
||||
"""Async implementation of browser_observe."""
|
||||
try:
|
||||
await browser_state.init_browser()
|
||||
page = await browser_state.get_page()
|
||||
|
||||
observations = await page.observe(query)
|
||||
|
||||
# Format observations for readability
|
||||
result = []
|
||||
for obs in observations:
|
||||
result.append(
|
||||
{
|
||||
"description": obs.description,
|
||||
"selector": obs.selector,
|
||||
"method": obs.method,
|
||||
}
|
||||
)
|
||||
|
||||
return json.dumps(result, indent=2)
|
||||
except Exception as e:
|
||||
logger.error(f"Observation error: {str(e)}")
|
||||
return f"Failed to observe '{query}': {str(e)}"
|
||||
|
||||
|
||||
def browser_screenshot(filename: str = "screenshot.png") -> str:
|
||||
"""
|
||||
Take a screenshot of the current web page.
|
||||
|
||||
Args:
|
||||
filename (str, optional): The filename to save the screenshot to.
|
||||
Defaults to "screenshot.png".
|
||||
.png extension will be added automatically if not provided.
|
||||
|
||||
Returns:
|
||||
str: Success message with the filename where screenshot was saved,
|
||||
or error message if screenshot fails
|
||||
|
||||
Raises:
|
||||
RuntimeError: If browser is not initialized or page is not available
|
||||
Exception: If screenshot capture or file saving fails
|
||||
|
||||
Example:
|
||||
>>> result = browser_screenshot()
|
||||
>>> print(result)
|
||||
"Screenshot saved to screenshot.png"
|
||||
|
||||
>>> result = browser_screenshot("page_capture.png")
|
||||
>>> print(result)
|
||||
"Screenshot saved to page_capture.png"
|
||||
"""
|
||||
return asyncio.run(_browser_screenshot_async(filename))
|
||||
|
||||
|
||||
async def _browser_screenshot_async(filename: str) -> str:
|
||||
"""Async implementation of browser_screenshot."""
|
||||
try:
|
||||
await browser_state.init_browser()
|
||||
page = await browser_state.get_page()
|
||||
|
||||
# Ensure .png extension
|
||||
if not filename.endswith(".png"):
|
||||
filename += ".png"
|
||||
|
||||
# Get the underlying Playwright page
|
||||
playwright_page = page.page
|
||||
await playwright_page.screenshot(path=filename)
|
||||
|
||||
return f"Screenshot saved to {filename}"
|
||||
except Exception as e:
|
||||
logger.error(f"Screenshot error: {str(e)}")
|
||||
return f"Failed to take screenshot: {str(e)}"
|
||||
|
||||
|
||||
def close_browser() -> str:
|
||||
"""
|
||||
Close the browser when done with automation tasks.
|
||||
|
||||
Returns:
|
||||
str: Success message if browser is closed successfully,
|
||||
or error message if closing fails
|
||||
|
||||
Raises:
|
||||
Exception: If browser closing process encounters errors
|
||||
|
||||
Example:
|
||||
>>> result = close_browser()
|
||||
>>> print(result)
|
||||
"Browser closed successfully"
|
||||
"""
|
||||
return asyncio.run(_close_browser_async())
|
||||
|
||||
|
||||
async def _close_browser_async() -> str:
|
||||
"""Async implementation of close_browser."""
|
||||
try:
|
||||
await browser_state.close()
|
||||
return "Browser closed successfully"
|
||||
except Exception as e:
|
||||
logger.error(f"Close browser error: {str(e)}")
|
||||
return f"Failed to close browser: {str(e)}"
|
||||
|
||||
|
||||
# Example usage
|
||||
if __name__ == "__main__":
|
||||
# Create a Swarms agent with browser tools
|
||||
browser_agent = Agent(
|
||||
agent_name="BrowserAutomationAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
tools=[
|
||||
navigate_browser,
|
||||
browser_act,
|
||||
browser_extract,
|
||||
browser_observe,
|
||||
browser_screenshot,
|
||||
close_browser,
|
||||
],
|
||||
system_prompt="""You are a web browser automation specialist. You can:
|
||||
1. Navigate to websites using the navigate_browser tool
|
||||
2. Perform actions like clicking and typing using the browser_act tool
|
||||
3. Extract information from pages using the browser_extract tool
|
||||
4. Find and observe elements using the browser_observe tool
|
||||
5. Take screenshots using the browser_screenshot tool
|
||||
6. Close the browser when done using the close_browser tool
|
||||
|
||||
Always start by navigating to a URL before trying to interact with a page.
|
||||
Be specific in your actions and extractions. When done with tasks, close the browser.""",
|
||||
)
|
||||
|
||||
# Example 1: Research task
|
||||
print("Example 1: Automated web research")
|
||||
result1 = browser_agent.run(
|
||||
"Go to hackernews (news.ycombinator.com) and extract the titles of the top 5 stories. Then take a screenshot."
|
||||
)
|
||||
print(result1)
|
||||
print("\n" + "=" * 50 + "\n")
|
||||
|
||||
# Example 2: Search task
|
||||
print("Example 2: Perform a web search")
|
||||
result2 = browser_agent.run(
|
||||
"Navigate to google.com, search for 'Python web scraping best practices', and extract the first 3 search result titles"
|
||||
)
|
||||
print(result2)
|
||||
print("\n" + "=" * 50 + "\n")
|
||||
|
||||
# Example 3: Form interaction
|
||||
print("Example 3: Interact with a form")
|
||||
result3 = browser_agent.run(
|
||||
"Go to example.com and observe what elements are on the page. Then extract all the text content."
|
||||
)
|
||||
print(result3)
|
||||
|
||||
# Clean up
|
||||
browser_agent.run("Close the browser")
|
@ -0,0 +1,263 @@
|
||||
"""
|
||||
Stagehand MCP Server Integration with Swarms
|
||||
============================================
|
||||
|
||||
This example demonstrates how to use the Stagehand MCP (Model Context Protocol)
|
||||
server with Swarms agents. The MCP server provides browser automation capabilities
|
||||
as standardized tools that can be discovered and used by agents.
|
||||
|
||||
Prerequisites:
|
||||
1. Install and run the Stagehand MCP server:
|
||||
cd stagehand-mcp-server
|
||||
npm install
|
||||
npm run build
|
||||
npm start
|
||||
|
||||
2. The server will start on http://localhost:3000/sse
|
||||
|
||||
Features:
|
||||
- Automatic tool discovery from MCP server
|
||||
- Multi-session browser management
|
||||
- Built-in screenshot resources
|
||||
- Prompt templates for common tasks
|
||||
"""
|
||||
|
||||
from typing import List
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from swarms import Agent
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
class StagehandMCPAgent:
|
||||
"""
|
||||
A Swarms agent that connects to the Stagehand MCP server
|
||||
for browser automation capabilities.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent_name: str = "StagehandMCPAgent",
|
||||
mcp_server_url: str = "http://localhost:3000/sse",
|
||||
model_name: str = "gpt-4o-mini",
|
||||
max_loops: int = 1,
|
||||
):
|
||||
"""
|
||||
Initialize the Stagehand MCP Agent.
|
||||
|
||||
Args:
|
||||
agent_name: Name of the agent
|
||||
mcp_server_url: URL of the Stagehand MCP server
|
||||
model_name: LLM model to use
|
||||
max_loops: Maximum number of reasoning loops
|
||||
"""
|
||||
self.agent = Agent(
|
||||
agent_name=agent_name,
|
||||
model_name=model_name,
|
||||
max_loops=max_loops,
|
||||
# Connect to the Stagehand MCP server
|
||||
mcp_url=mcp_server_url,
|
||||
system_prompt="""You are a web browser automation specialist with access to Stagehand MCP tools.
|
||||
|
||||
Available tools from the MCP server:
|
||||
- navigate: Navigate to a URL
|
||||
- act: Perform actions on web pages (click, type, etc.)
|
||||
- extract: Extract data from web pages
|
||||
- observe: Find and observe elements on pages
|
||||
- screenshot: Take screenshots
|
||||
- createSession: Create new browser sessions for parallel tasks
|
||||
- listSessions: List active browser sessions
|
||||
- closeSession: Close browser sessions
|
||||
|
||||
For multi-page workflows, you can create multiple sessions.
|
||||
Always be specific in your actions and extractions.
|
||||
Remember to close sessions when done with them.""",
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
def run(self, task: str) -> str:
|
||||
"""Run a browser automation task."""
|
||||
return self.agent.run(task)
|
||||
|
||||
|
||||
class MultiSessionBrowserSwarm:
|
||||
"""
|
||||
A multi-agent swarm that uses multiple browser sessions
|
||||
for parallel web automation tasks.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
mcp_server_url: str = "http://localhost:3000/sse",
|
||||
num_agents: int = 3,
|
||||
):
|
||||
"""
|
||||
Initialize a swarm of browser automation agents.
|
||||
|
||||
Args:
|
||||
mcp_server_url: URL of the Stagehand MCP server
|
||||
num_agents: Number of agents to create
|
||||
"""
|
||||
self.agents = []
|
||||
|
||||
# Create specialized agents for different tasks
|
||||
agent_roles = [
|
||||
(
|
||||
"DataExtractor",
|
||||
"You specialize in extracting structured data from websites.",
|
||||
),
|
||||
(
|
||||
"FormFiller",
|
||||
"You specialize in filling out forms and interacting with web applications.",
|
||||
),
|
||||
(
|
||||
"WebMonitor",
|
||||
"You specialize in monitoring websites for changes and capturing screenshots.",
|
||||
),
|
||||
]
|
||||
|
||||
for i in range(min(num_agents, len(agent_roles))):
|
||||
name, specialization = agent_roles[i]
|
||||
agent = Agent(
|
||||
agent_name=f"{name}_{i}",
|
||||
model_name="gpt-4o-mini",
|
||||
max_loops=1,
|
||||
mcp_url=mcp_server_url,
|
||||
system_prompt=f"""You are a web browser automation specialist. {specialization}
|
||||
|
||||
You have access to Stagehand MCP tools including:
|
||||
- createSession: Create a new browser session
|
||||
- navigate_session: Navigate to URLs in a specific session
|
||||
- act_session: Perform actions in a specific session
|
||||
- extract_session: Extract data from a specific session
|
||||
- observe_session: Observe elements in a specific session
|
||||
- closeSession: Close a session when done
|
||||
|
||||
Always create your own session for tasks to work independently from other agents.""",
|
||||
verbose=True,
|
||||
)
|
||||
self.agents.append(agent)
|
||||
|
||||
def distribute_tasks(self, tasks: List[str]) -> List[str]:
|
||||
"""Distribute tasks among agents."""
|
||||
results = []
|
||||
|
||||
# Distribute tasks round-robin among agents
|
||||
for i, task in enumerate(tasks):
|
||||
agent_idx = i % len(self.agents)
|
||||
agent = self.agents[agent_idx]
|
||||
|
||||
logger.info(
|
||||
f"Assigning task to {agent.agent_name}: {task}"
|
||||
)
|
||||
result = agent.run(task)
|
||||
results.append(result)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# Example usage
|
||||
if __name__ == "__main__":
|
||||
print("=" * 70)
|
||||
print("Stagehand MCP Server Integration Examples")
|
||||
print("=" * 70)
|
||||
print(
|
||||
"\nMake sure the Stagehand MCP server is running on http://localhost:3000/sse"
|
||||
)
|
||||
print("Run: cd stagehand-mcp-server && npm start\n")
|
||||
|
||||
# Example 1: Single agent with MCP tools
|
||||
print("\nExample 1: Single Agent with MCP Tools")
|
||||
print("-" * 40)
|
||||
|
||||
mcp_agent = StagehandMCPAgent(
|
||||
agent_name="WebResearchAgent",
|
||||
mcp_server_url="http://localhost:3000/sse",
|
||||
)
|
||||
|
||||
# Research task using MCP tools
|
||||
result1 = mcp_agent.run(
|
||||
"""Navigate to news.ycombinator.com and extract the following:
|
||||
1. The titles of the top 5 stories
|
||||
2. Their points/scores
|
||||
3. Number of comments for each
|
||||
Then take a screenshot of the page."""
|
||||
)
|
||||
print(f"Result: {result1}")
|
||||
|
||||
print("\n" + "=" * 70 + "\n")
|
||||
|
||||
# Example 2: Multi-session parallel browsing
|
||||
print("Example 2: Multi-Session Parallel Browsing")
|
||||
print("-" * 40)
|
||||
|
||||
parallel_agent = StagehandMCPAgent(
|
||||
agent_name="ParallelBrowserAgent",
|
||||
mcp_server_url="http://localhost:3000/sse",
|
||||
)
|
||||
|
||||
result2 = parallel_agent.run(
|
||||
"""Create 3 browser sessions and perform these tasks in parallel:
|
||||
1. Session 1: Go to github.com/trending and extract the top 3 trending repositories
|
||||
2. Session 2: Go to reddit.com/r/programming and extract the top 3 posts
|
||||
3. Session 3: Go to stackoverflow.com and extract the featured questions
|
||||
|
||||
After extracting data from all sessions, close them."""
|
||||
)
|
||||
print(f"Result: {result2}")
|
||||
|
||||
print("\n" + "=" * 70 + "\n")
|
||||
|
||||
# Example 3: Multi-agent browser swarm
|
||||
print("Example 3: Multi-Agent Browser Swarm")
|
||||
print("-" * 40)
|
||||
|
||||
# Create a swarm of specialized browser agents
|
||||
browser_swarm = MultiSessionBrowserSwarm(
|
||||
mcp_server_url="http://localhost:3000/sse",
|
||||
num_agents=3,
|
||||
)
|
||||
|
||||
# Define tasks for the swarm
|
||||
swarm_tasks = [
|
||||
"Create a session, navigate to python.org, and extract information about the latest Python version and its key features",
|
||||
"Create a session, go to npmjs.com, search for 'stagehand', and extract information about the package including version and description",
|
||||
"Create a session, visit playwright.dev, and extract the main features and benefits listed on the homepage",
|
||||
]
|
||||
|
||||
print("Distributing tasks to browser swarm...")
|
||||
swarm_results = browser_swarm.distribute_tasks(swarm_tasks)
|
||||
|
||||
for i, result in enumerate(swarm_results):
|
||||
print(f"\nTask {i+1} Result: {result}")
|
||||
|
||||
print("\n" + "=" * 70 + "\n")
|
||||
|
||||
# Example 4: Complex workflow with session management
|
||||
print("Example 4: Complex Multi-Page Workflow")
|
||||
print("-" * 40)
|
||||
|
||||
workflow_agent = StagehandMCPAgent(
|
||||
agent_name="WorkflowAgent",
|
||||
mcp_server_url="http://localhost:3000/sse",
|
||||
max_loops=2, # Allow more complex reasoning
|
||||
)
|
||||
|
||||
result4 = workflow_agent.run(
|
||||
"""Perform a comprehensive analysis of AI frameworks:
|
||||
1. Create a new session
|
||||
2. Navigate to github.com/huggingface/transformers and extract the star count and latest release info
|
||||
3. In the same session, navigate to github.com/openai/gpt-3 and extract similar information
|
||||
4. Navigate to github.com/anthropics/anthropic-sdk-python and extract repository statistics
|
||||
5. Take screenshots of each repository page
|
||||
6. Compile a comparison report of all three repositories
|
||||
7. Close the session when done"""
|
||||
)
|
||||
print(f"Result: {result4}")
|
||||
|
||||
print("\n" + "=" * 70)
|
||||
print("All examples completed!")
|
||||
print("=" * 70)
|
@ -0,0 +1,371 @@
|
||||
"""
|
||||
Stagehand Multi-Agent Browser Automation Workflows
|
||||
=================================================
|
||||
|
||||
This example demonstrates advanced multi-agent workflows using Stagehand
|
||||
for complex browser automation scenarios. It shows how multiple agents
|
||||
can work together to accomplish sophisticated web tasks.
|
||||
|
||||
Use cases:
|
||||
1. E-commerce price monitoring across multiple sites
|
||||
2. Competitive analysis and market research
|
||||
3. Automated testing and validation workflows
|
||||
4. Data aggregation from multiple sources
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from swarms import Agent, SequentialWorkflow, ConcurrentWorkflow
|
||||
from swarms.structs.agent_rearrange import AgentRearrange
|
||||
from examples.stagehand.stagehand_wrapper_agent import StagehandAgent
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
# Pydantic models for structured data
|
||||
class ProductInfo(BaseModel):
|
||||
"""Product information schema."""
|
||||
|
||||
name: str = Field(..., description="Product name")
|
||||
price: float = Field(..., description="Product price")
|
||||
availability: str = Field(..., description="Availability status")
|
||||
url: str = Field(..., description="Product URL")
|
||||
screenshot_path: Optional[str] = Field(
|
||||
None, description="Screenshot file path"
|
||||
)
|
||||
|
||||
|
||||
class MarketAnalysis(BaseModel):
|
||||
"""Market analysis report schema."""
|
||||
|
||||
timestamp: datetime = Field(default_factory=datetime.now)
|
||||
products: List[ProductInfo] = Field(
|
||||
..., description="List of products analyzed"
|
||||
)
|
||||
price_range: Dict[str, float] = Field(
|
||||
..., description="Min and max prices"
|
||||
)
|
||||
recommendations: List[str] = Field(
|
||||
..., description="Analysis recommendations"
|
||||
)
|
||||
|
||||
|
||||
# Specialized browser agents
|
||||
class ProductScraperAgent(StagehandAgent):
|
||||
"""Specialized agent for scraping product information."""
|
||||
|
||||
def __init__(self, site_name: str, *args, **kwargs):
|
||||
super().__init__(
|
||||
agent_name=f"ProductScraper_{site_name}", *args, **kwargs
|
||||
)
|
||||
self.site_name = site_name
|
||||
|
||||
|
||||
class PriceMonitorAgent(StagehandAgent):
|
||||
"""Specialized agent for monitoring price changes."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(
|
||||
agent_name="PriceMonitorAgent", *args, **kwargs
|
||||
)
|
||||
|
||||
|
||||
# Example 1: E-commerce Price Comparison Workflow
|
||||
def create_price_comparison_workflow():
|
||||
"""
|
||||
Create a workflow that compares prices across multiple e-commerce sites.
|
||||
"""
|
||||
|
||||
# Create specialized agents for different sites
|
||||
amazon_agent = StagehandAgent(
|
||||
agent_name="AmazonScraperAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
|
||||
ebay_agent = StagehandAgent(
|
||||
agent_name="EbayScraperAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
|
||||
analysis_agent = Agent(
|
||||
agent_name="PriceAnalysisAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
system_prompt="""You are a price analysis expert. Analyze product prices from multiple sources
|
||||
and provide insights on the best deals, price trends, and recommendations.
|
||||
Focus on value for money and highlight any significant price differences.""",
|
||||
)
|
||||
|
||||
# Create concurrent workflow for parallel scraping
|
||||
scraping_workflow = ConcurrentWorkflow(
|
||||
agents=[amazon_agent, ebay_agent],
|
||||
max_loops=1,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
# Create sequential workflow: scrape -> analyze
|
||||
full_workflow = SequentialWorkflow(
|
||||
agents=[scraping_workflow, analysis_agent],
|
||||
max_loops=1,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
return full_workflow
|
||||
|
||||
|
||||
# Example 2: Competitive Analysis Workflow
|
||||
def create_competitive_analysis_workflow():
|
||||
"""
|
||||
Create a workflow for competitive analysis across multiple company websites.
|
||||
"""
|
||||
|
||||
# Agent for extracting company information
|
||||
company_researcher = StagehandAgent(
|
||||
agent_name="CompanyResearchAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
|
||||
# Agent for analyzing social media presence
|
||||
social_media_agent = StagehandAgent(
|
||||
agent_name="SocialMediaAnalysisAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
|
||||
# Agent for compiling competitive analysis report
|
||||
report_compiler = Agent(
|
||||
agent_name="CompetitiveAnalysisReporter",
|
||||
model_name="gpt-4o-mini",
|
||||
system_prompt="""You are a competitive analysis expert. Compile comprehensive reports
|
||||
based on company information and social media presence data. Identify strengths,
|
||||
weaknesses, and market positioning for each company.""",
|
||||
)
|
||||
|
||||
# Create agent rearrange for flexible routing
|
||||
workflow_pattern = (
|
||||
"company_researcher -> social_media_agent -> report_compiler"
|
||||
)
|
||||
|
||||
competitive_workflow = AgentRearrange(
|
||||
agents=[
|
||||
company_researcher,
|
||||
social_media_agent,
|
||||
report_compiler,
|
||||
],
|
||||
flow=workflow_pattern,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
return competitive_workflow
|
||||
|
||||
|
||||
# Example 3: Automated Testing Workflow
|
||||
def create_automated_testing_workflow():
|
||||
"""
|
||||
Create a workflow for automated web application testing.
|
||||
"""
|
||||
|
||||
# Agent for UI testing
|
||||
ui_tester = StagehandAgent(
|
||||
agent_name="UITestingAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
|
||||
# Agent for form validation testing
|
||||
form_tester = StagehandAgent(
|
||||
agent_name="FormValidationAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
|
||||
# Agent for accessibility testing
|
||||
accessibility_tester = StagehandAgent(
|
||||
agent_name="AccessibilityTestingAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
|
||||
# Agent for compiling test results
|
||||
test_reporter = Agent(
|
||||
agent_name="TestReportCompiler",
|
||||
model_name="gpt-4o-mini",
|
||||
system_prompt="""You are a QA test report specialist. Compile test results from
|
||||
UI, form validation, and accessibility testing into a comprehensive report.
|
||||
Highlight any failures, warnings, and provide recommendations for fixes.""",
|
||||
)
|
||||
|
||||
# Concurrent testing followed by report generation
|
||||
testing_workflow = ConcurrentWorkflow(
|
||||
agents=[ui_tester, form_tester, accessibility_tester],
|
||||
max_loops=1,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
full_test_workflow = SequentialWorkflow(
|
||||
agents=[testing_workflow, test_reporter],
|
||||
max_loops=1,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
return full_test_workflow
|
||||
|
||||
|
||||
# Example 4: News Aggregation and Sentiment Analysis
|
||||
def create_news_aggregation_workflow():
|
||||
"""
|
||||
Create a workflow for news aggregation and sentiment analysis.
|
||||
"""
|
||||
|
||||
# Multiple news scraper agents
|
||||
news_scrapers = []
|
||||
news_sites = [
|
||||
("TechCrunch", "https://techcrunch.com"),
|
||||
("HackerNews", "https://news.ycombinator.com"),
|
||||
("Reddit", "https://reddit.com/r/technology"),
|
||||
]
|
||||
|
||||
for site_name, url in news_sites:
|
||||
scraper = StagehandAgent(
|
||||
agent_name=f"{site_name}Scraper",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
news_scrapers.append(scraper)
|
||||
|
||||
# Sentiment analysis agent
|
||||
sentiment_analyzer = Agent(
|
||||
agent_name="SentimentAnalyzer",
|
||||
model_name="gpt-4o-mini",
|
||||
system_prompt="""You are a sentiment analysis expert. Analyze news articles and posts
|
||||
to determine overall sentiment (positive, negative, neutral) and identify key themes
|
||||
and trends in the technology sector.""",
|
||||
)
|
||||
|
||||
# Trend identification agent
|
||||
trend_identifier = Agent(
|
||||
agent_name="TrendIdentifier",
|
||||
model_name="gpt-4o-mini",
|
||||
system_prompt="""You are a trend analysis expert. Based on aggregated news and sentiment
|
||||
data, identify emerging trends, hot topics, and potential market movements in the
|
||||
technology sector.""",
|
||||
)
|
||||
|
||||
# Create workflow: parallel scraping -> sentiment analysis -> trend identification
|
||||
scraping_workflow = ConcurrentWorkflow(
|
||||
agents=news_scrapers,
|
||||
max_loops=1,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
analysis_workflow = SequentialWorkflow(
|
||||
agents=[
|
||||
scraping_workflow,
|
||||
sentiment_analyzer,
|
||||
trend_identifier,
|
||||
],
|
||||
max_loops=1,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
return analysis_workflow
|
||||
|
||||
|
||||
# Main execution examples
|
||||
if __name__ == "__main__":
|
||||
print("=" * 70)
|
||||
print("Stagehand Multi-Agent Workflow Examples")
|
||||
print("=" * 70)
|
||||
|
||||
# Example 1: Price Comparison
|
||||
print("\nExample 1: E-commerce Price Comparison")
|
||||
print("-" * 40)
|
||||
|
||||
price_workflow = create_price_comparison_workflow()
|
||||
|
||||
# Search for a specific product across multiple sites
|
||||
price_result = price_workflow.run(
|
||||
"""Search for 'iPhone 15 Pro Max 256GB' on:
|
||||
1. Amazon - extract price, availability, and seller information
|
||||
2. eBay - extract price range, number of listings, and average price
|
||||
Take screenshots of search results from both sites.
|
||||
Compare the prices and provide recommendations on where to buy."""
|
||||
)
|
||||
print(f"Price Comparison Result:\n{price_result}")
|
||||
|
||||
print("\n" + "=" * 70 + "\n")
|
||||
|
||||
# Example 2: Competitive Analysis
|
||||
print("Example 2: Competitive Analysis")
|
||||
print("-" * 40)
|
||||
|
||||
competitive_workflow = create_competitive_analysis_workflow()
|
||||
|
||||
competitive_result = competitive_workflow.run(
|
||||
"""Analyze these three AI companies:
|
||||
1. OpenAI - visit openai.com and extract mission, products, and recent announcements
|
||||
2. Anthropic - visit anthropic.com and extract their AI safety approach and products
|
||||
3. DeepMind - visit deepmind.com and extract research focus and achievements
|
||||
|
||||
Then check their Twitter/X presence and recent posts.
|
||||
Compile a competitive analysis report comparing their market positioning."""
|
||||
)
|
||||
print(f"Competitive Analysis Result:\n{competitive_result}")
|
||||
|
||||
print("\n" + "=" * 70 + "\n")
|
||||
|
||||
# Example 3: Automated Testing
|
||||
print("Example 3: Automated Web Testing")
|
||||
print("-" * 40)
|
||||
|
||||
testing_workflow = create_automated_testing_workflow()
|
||||
|
||||
test_result = testing_workflow.run(
|
||||
"""Test the website example.com:
|
||||
1. UI Testing: Check if all main navigation links work, images load, and layout is responsive
|
||||
2. Form Testing: If there are any forms, test with valid and invalid inputs
|
||||
3. Accessibility: Check for alt texts, ARIA labels, and keyboard navigation
|
||||
|
||||
Take screenshots of any issues found and compile a comprehensive test report."""
|
||||
)
|
||||
print(f"Test Results:\n{test_result}")
|
||||
|
||||
print("\n" + "=" * 70 + "\n")
|
||||
|
||||
# Example 4: News Aggregation
|
||||
print("Example 4: Tech News Aggregation and Analysis")
|
||||
print("-" * 40)
|
||||
|
||||
news_workflow = create_news_aggregation_workflow()
|
||||
|
||||
news_result = news_workflow.run(
|
||||
"""For each news source:
|
||||
1. TechCrunch: Extract the top 5 headlines about AI or machine learning
|
||||
2. HackerNews: Extract the top 5 posts related to AI/ML with most points
|
||||
3. Reddit r/technology: Extract top 5 posts about AI from the past week
|
||||
|
||||
Analyze sentiment and identify emerging trends in AI technology."""
|
||||
)
|
||||
print(f"News Analysis Result:\n{news_result}")
|
||||
|
||||
# Cleanup all browser instances
|
||||
print("\n" + "=" * 70)
|
||||
print("Cleaning up browser instances...")
|
||||
|
||||
# Clean up agents
|
||||
for agent in price_workflow.agents:
|
||||
if isinstance(agent, StagehandAgent):
|
||||
agent.cleanup()
|
||||
elif hasattr(agent, "agents"): # For nested workflows
|
||||
for sub_agent in agent.agents:
|
||||
if isinstance(sub_agent, StagehandAgent):
|
||||
sub_agent.cleanup()
|
||||
|
||||
print("All workflows completed!")
|
||||
print("=" * 70)
|
@ -0,0 +1,249 @@
|
||||
# Stagehand Browser Automation Integration for Swarms
|
||||
|
||||
This directory contains examples demonstrating how to integrate [Stagehand](https://github.com/browserbase/stagehand), an AI-powered browser automation framework, with the Swarms multi-agent framework.
|
||||
|
||||
## Overview
|
||||
|
||||
Stagehand provides natural language browser automation capabilities that can be seamlessly integrated into Swarms agents. This integration enables:
|
||||
|
||||
- 🌐 **Natural Language Web Automation**: Use simple commands like "click the submit button" or "extract product prices"
|
||||
- 🤖 **Multi-Agent Browser Workflows**: Multiple agents can automate different websites simultaneously
|
||||
- 🔧 **Flexible Integration Options**: Use as a wrapped agent, individual tools, or via MCP server
|
||||
- 📊 **Complex Automation Scenarios**: E-commerce monitoring, competitive analysis, automated testing, and more
|
||||
|
||||
## Examples
|
||||
|
||||
### 1. Stagehand Wrapper Agent (`1_stagehand_wrapper_agent.py`)
|
||||
|
||||
The simplest integration - wraps Stagehand as a Swarms-compatible agent.
|
||||
|
||||
```python
|
||||
from examples.stagehand.stagehand_wrapper_agent import StagehandAgent
|
||||
|
||||
# Create a browser automation agent
|
||||
browser_agent = StagehandAgent(
|
||||
agent_name="WebScraperAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL", # or "BROWSERBASE" for cloud execution
|
||||
)
|
||||
|
||||
# Use natural language to control the browser
|
||||
result = browser_agent.run(
|
||||
"Navigate to news.ycombinator.com and extract the top 5 story titles"
|
||||
)
|
||||
```
|
||||
|
||||
**Features:**
|
||||
- Inherits from Swarms `Agent` base class
|
||||
- Automatic browser lifecycle management
|
||||
- Natural language task interpretation
|
||||
- Support for both local (Playwright) and cloud (Browserbase) execution
|
||||
|
||||
### 2. Stagehand as Tools (`2_stagehand_tools_agent.py`)
|
||||
|
||||
Provides fine-grained control by exposing Stagehand methods as individual tools.
|
||||
|
||||
```python
|
||||
from swarms import Agent
|
||||
from examples.stagehand.stagehand_tools_agent import (
|
||||
NavigateTool, ActTool, ExtractTool, ObserveTool, ScreenshotTool
|
||||
)
|
||||
|
||||
# Create agent with browser tools
|
||||
browser_agent = Agent(
|
||||
agent_name="BrowserAutomationAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
tools=[
|
||||
NavigateTool(),
|
||||
ActTool(),
|
||||
ExtractTool(),
|
||||
ObserveTool(),
|
||||
ScreenshotTool(),
|
||||
],
|
||||
)
|
||||
|
||||
# Agent can now use tools strategically
|
||||
result = browser_agent.run(
|
||||
"Go to google.com, search for 'Python tutorials', and extract the first 3 results"
|
||||
)
|
||||
```
|
||||
|
||||
**Available Tools:**
|
||||
- `NavigateTool`: Navigate to URLs
|
||||
- `ActTool`: Perform actions (click, type, scroll)
|
||||
- `ExtractTool`: Extract data from pages
|
||||
- `ObserveTool`: Find elements on pages
|
||||
- `ScreenshotTool`: Capture screenshots
|
||||
- `CloseBrowserTool`: Clean up browser resources
|
||||
|
||||
### 3. Stagehand MCP Server (`3_stagehand_mcp_agent.py`)
|
||||
|
||||
Integrates with Stagehand's Model Context Protocol (MCP) server for standardized tool access.
|
||||
|
||||
```python
|
||||
from examples.stagehand.stagehand_mcp_agent import StagehandMCPAgent
|
||||
|
||||
# Connect to Stagehand MCP server
|
||||
mcp_agent = StagehandMCPAgent(
|
||||
agent_name="WebResearchAgent",
|
||||
mcp_server_url="http://localhost:3000/sse",
|
||||
)
|
||||
|
||||
# Use MCP tools including multi-session management
|
||||
result = mcp_agent.run("""
|
||||
Create 3 browser sessions and:
|
||||
1. Session 1: Check Python.org for latest version
|
||||
2. Session 2: Check PyPI for trending packages
|
||||
3. Session 3: Check GitHub Python trending repos
|
||||
Compile a Python ecosystem status report.
|
||||
""")
|
||||
```
|
||||
|
||||
**MCP Features:**
|
||||
- Automatic tool discovery
|
||||
- Multi-session browser management
|
||||
- Built-in screenshot resources
|
||||
- Prompt templates for common tasks
|
||||
|
||||
### 4. Multi-Agent Workflows (`4_stagehand_multi_agent_workflow.py`)
|
||||
|
||||
Demonstrates complex multi-agent browser automation scenarios.
|
||||
|
||||
```python
|
||||
from examples.stagehand.stagehand_multi_agent_workflow import (
|
||||
create_price_comparison_workflow,
|
||||
create_competitive_analysis_workflow,
|
||||
create_automated_testing_workflow,
|
||||
create_news_aggregation_workflow
|
||||
)
|
||||
|
||||
# Price comparison across multiple e-commerce sites
|
||||
price_workflow = create_price_comparison_workflow()
|
||||
result = price_workflow.run(
|
||||
"Compare prices for iPhone 15 Pro on Amazon and eBay"
|
||||
)
|
||||
|
||||
# Competitive analysis of multiple companies
|
||||
competitive_workflow = create_competitive_analysis_workflow()
|
||||
result = competitive_workflow.run(
|
||||
"Analyze OpenAI, Anthropic, and DeepMind websites and social media"
|
||||
)
|
||||
```
|
||||
|
||||
**Workflow Examples:**
|
||||
- **E-commerce Monitoring**: Track prices across multiple sites
|
||||
- **Competitive Analysis**: Research competitors' websites and social media
|
||||
- **Automated Testing**: UI, form validation, and accessibility testing
|
||||
- **News Aggregation**: Collect and analyze news from multiple sources
|
||||
|
||||
## Setup
|
||||
|
||||
### Prerequisites
|
||||
|
||||
1. **Install Swarms and Stagehand:**
|
||||
```bash
|
||||
pip install swarms stagehand
|
||||
```
|
||||
|
||||
2. **Set up environment variables:**
|
||||
```bash
|
||||
# For local browser automation (using Playwright)
|
||||
export OPENAI_API_KEY="your-openai-key"
|
||||
|
||||
# For cloud browser automation (using Browserbase)
|
||||
export BROWSERBASE_API_KEY="your-browserbase-key"
|
||||
export BROWSERBASE_PROJECT_ID="your-project-id"
|
||||
```
|
||||
|
||||
3. **For MCP Server examples:**
|
||||
```bash
|
||||
# Install and run the Stagehand MCP server
|
||||
cd stagehand-mcp-server
|
||||
npm install
|
||||
npm run build
|
||||
npm start
|
||||
```
|
||||
|
||||
## Use Cases
|
||||
|
||||
### E-commerce Automation
|
||||
- Price monitoring and comparison
|
||||
- Inventory tracking
|
||||
- Automated purchasing workflows
|
||||
- Review aggregation
|
||||
|
||||
### Research and Analysis
|
||||
- Competitive intelligence gathering
|
||||
- Market research automation
|
||||
- Social media monitoring
|
||||
- News and trend analysis
|
||||
|
||||
### Quality Assurance
|
||||
- Automated UI testing
|
||||
- Cross-browser compatibility testing
|
||||
- Form validation testing
|
||||
- Accessibility compliance checking
|
||||
|
||||
### Data Collection
|
||||
- Web scraping at scale
|
||||
- Real-time data monitoring
|
||||
- Structured data extraction
|
||||
- Screenshot documentation
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Resource Management**: Always clean up browser instances when done
|
||||
```python
|
||||
browser_agent.cleanup() # For wrapper agents
|
||||
```
|
||||
|
||||
2. **Error Handling**: Stagehand includes self-healing capabilities, but wrap critical operations in try-except blocks
|
||||
|
||||
3. **Parallel Execution**: Use `ConcurrentWorkflow` for simultaneous browser automation across multiple sites
|
||||
|
||||
4. **Session Management**: For complex multi-page workflows, use the MCP server's session management capabilities
|
||||
|
||||
5. **Rate Limiting**: Be respectful of websites - add delays between requests when necessary
|
||||
|
||||
## Testing
|
||||
|
||||
Run the test suite to verify the integration:
|
||||
|
||||
```bash
|
||||
pytest tests/stagehand/test_stagehand_integration.py -v
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Common Issues
|
||||
|
||||
1. **Browser not starting**: Ensure Playwright is properly installed
|
||||
```bash
|
||||
playwright install
|
||||
```
|
||||
|
||||
2. **MCP connection failed**: Verify the MCP server is running on the correct port
|
||||
|
||||
3. **Timeout errors**: Increase timeout in StagehandConfig or agent initialization
|
||||
|
||||
### Debug Mode
|
||||
|
||||
Enable verbose logging:
|
||||
```python
|
||||
agent = StagehandAgent(
|
||||
agent_name="DebugAgent",
|
||||
verbose=True, # Enable detailed logging
|
||||
)
|
||||
```
|
||||
|
||||
## Contributing
|
||||
|
||||
We welcome contributions! Please:
|
||||
1. Follow the existing code style
|
||||
2. Add tests for new features
|
||||
3. Update documentation
|
||||
4. Submit PRs with clear descriptions
|
||||
|
||||
## License
|
||||
|
||||
These examples are provided under the same license as the Swarms framework. Stagehand is licensed separately - see [Stagehand's repository](https://github.com/browserbase/stagehand) for details.
|
@ -0,0 +1,13 @@
|
||||
# Requirements for Stagehand integration examples
|
||||
swarms>=8.0.0
|
||||
stagehand>=0.1.0
|
||||
python-dotenv>=1.0.0
|
||||
pydantic>=2.0.0
|
||||
loguru>=0.7.0
|
||||
|
||||
# For MCP server examples (optional)
|
||||
httpx>=0.24.0
|
||||
|
||||
# For testing
|
||||
pytest>=7.0.0
|
||||
pytest-asyncio>=0.21.0
|
@ -0,0 +1,436 @@
|
||||
"""
|
||||
Tests for Stagehand Integration with Swarms
|
||||
==========================================
|
||||
|
||||
This module contains tests for the Stagehand browser automation
|
||||
integration with the Swarms framework.
|
||||
"""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
|
||||
# Mock Stagehand classes
|
||||
class MockObserveResult:
|
||||
def __init__(self, description, selector, method="click"):
|
||||
self.description = description
|
||||
self.selector = selector
|
||||
self.method = method
|
||||
|
||||
|
||||
class MockStagehandPage:
|
||||
async def goto(self, url):
|
||||
return None
|
||||
|
||||
async def act(self, action):
|
||||
return f"Performed action: {action}"
|
||||
|
||||
async def extract(self, query):
|
||||
return {"extracted": query, "data": ["item1", "item2"]}
|
||||
|
||||
async def observe(self, query):
|
||||
return [
|
||||
MockObserveResult("Search box", "#search-input"),
|
||||
MockObserveResult("Submit button", "#submit-btn"),
|
||||
]
|
||||
|
||||
|
||||
class MockStagehand:
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
self.page = MockStagehandPage()
|
||||
|
||||
async def init(self):
|
||||
pass
|
||||
|
||||
async def close(self):
|
||||
pass
|
||||
|
||||
|
||||
# Test StagehandAgent wrapper
|
||||
class TestStagehandAgent:
|
||||
"""Test the StagehandAgent wrapper class."""
|
||||
|
||||
@patch(
|
||||
"examples.stagehand.stagehand_wrapper_agent.Stagehand",
|
||||
MockStagehand,
|
||||
)
|
||||
def test_agent_initialization(self):
|
||||
"""Test that StagehandAgent initializes correctly."""
|
||||
from examples.stagehand.stagehand_wrapper_agent import (
|
||||
StagehandAgent,
|
||||
)
|
||||
|
||||
agent = StagehandAgent(
|
||||
agent_name="TestAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
|
||||
assert agent.agent_name == "TestAgent"
|
||||
assert agent.stagehand_config.env == "LOCAL"
|
||||
assert agent.stagehand_config.model_name == "gpt-4o-mini"
|
||||
assert not agent._initialized
|
||||
|
||||
@patch(
|
||||
"examples.stagehand.stagehand_wrapper_agent.Stagehand",
|
||||
MockStagehand,
|
||||
)
|
||||
def test_navigation_task(self):
|
||||
"""Test navigation and extraction task."""
|
||||
from examples.stagehand.stagehand_wrapper_agent import (
|
||||
StagehandAgent,
|
||||
)
|
||||
|
||||
agent = StagehandAgent(
|
||||
agent_name="TestAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
|
||||
result = agent.run(
|
||||
"Navigate to example.com and extract the main content"
|
||||
)
|
||||
|
||||
# Parse result
|
||||
result_data = json.loads(result)
|
||||
assert result_data["status"] == "completed"
|
||||
assert "navigated_to" in result_data["data"]
|
||||
assert (
|
||||
result_data["data"]["navigated_to"]
|
||||
== "https://example.com"
|
||||
)
|
||||
assert "extracted" in result_data["data"]
|
||||
|
||||
@patch(
|
||||
"examples.stagehand.stagehand_wrapper_agent.Stagehand",
|
||||
MockStagehand,
|
||||
)
|
||||
def test_search_task(self):
|
||||
"""Test search functionality."""
|
||||
from examples.stagehand.stagehand_wrapper_agent import (
|
||||
StagehandAgent,
|
||||
)
|
||||
|
||||
agent = StagehandAgent(
|
||||
agent_name="TestAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
|
||||
result = agent.run(
|
||||
"Go to google.com and search for 'test query'"
|
||||
)
|
||||
|
||||
result_data = json.loads(result)
|
||||
assert result_data["status"] == "completed"
|
||||
assert result_data["data"]["search_query"] == "test query"
|
||||
assert result_data["action"] == "search"
|
||||
|
||||
@patch(
|
||||
"examples.stagehand.stagehand_wrapper_agent.Stagehand",
|
||||
MockStagehand,
|
||||
)
|
||||
def test_cleanup(self):
|
||||
"""Test that cleanup properly closes browser."""
|
||||
from examples.stagehand.stagehand_wrapper_agent import (
|
||||
StagehandAgent,
|
||||
)
|
||||
|
||||
agent = StagehandAgent(
|
||||
agent_name="TestAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
|
||||
# Initialize the agent
|
||||
agent.run("Navigate to example.com")
|
||||
assert agent._initialized
|
||||
|
||||
# Cleanup
|
||||
agent.cleanup()
|
||||
|
||||
# After cleanup, should be able to run again
|
||||
result = agent.run("Navigate to example.com")
|
||||
assert result is not None
|
||||
|
||||
|
||||
# Test Stagehand Tools
|
||||
class TestStagehandTools:
|
||||
"""Test individual Stagehand tools."""
|
||||
|
||||
@patch("examples.stagehand.stagehand_tools_agent.browser_state")
|
||||
async def test_navigate_tool(self, mock_browser_state):
|
||||
"""Test NavigateTool functionality."""
|
||||
from examples.stagehand.stagehand_tools_agent import (
|
||||
NavigateTool,
|
||||
)
|
||||
|
||||
# Setup mock
|
||||
mock_page = AsyncMock()
|
||||
mock_browser_state.get_page = AsyncMock(
|
||||
return_value=mock_page
|
||||
)
|
||||
mock_browser_state.init_browser = AsyncMock()
|
||||
|
||||
tool = NavigateTool()
|
||||
result = await tool._async_run("https://example.com")
|
||||
|
||||
assert (
|
||||
"Successfully navigated to https://example.com" in result
|
||||
)
|
||||
mock_page.goto.assert_called_once_with("https://example.com")
|
||||
|
||||
@patch("examples.stagehand.stagehand_tools_agent.browser_state")
|
||||
async def test_act_tool(self, mock_browser_state):
|
||||
"""Test ActTool functionality."""
|
||||
from examples.stagehand.stagehand_tools_agent import ActTool
|
||||
|
||||
# Setup mock
|
||||
mock_page = AsyncMock()
|
||||
mock_page.act = AsyncMock(return_value="Action completed")
|
||||
mock_browser_state.get_page = AsyncMock(
|
||||
return_value=mock_page
|
||||
)
|
||||
mock_browser_state.init_browser = AsyncMock()
|
||||
|
||||
tool = ActTool()
|
||||
result = await tool._async_run("click the button")
|
||||
|
||||
assert "Action performed" in result
|
||||
assert "click the button" in result
|
||||
mock_page.act.assert_called_once_with("click the button")
|
||||
|
||||
@patch("examples.stagehand.stagehand_tools_agent.browser_state")
|
||||
async def test_extract_tool(self, mock_browser_state):
|
||||
"""Test ExtractTool functionality."""
|
||||
from examples.stagehand.stagehand_tools_agent import (
|
||||
ExtractTool,
|
||||
)
|
||||
|
||||
# Setup mock
|
||||
mock_page = AsyncMock()
|
||||
mock_page.extract = AsyncMock(
|
||||
return_value={
|
||||
"title": "Test Page",
|
||||
"content": "Test content",
|
||||
}
|
||||
)
|
||||
mock_browser_state.get_page = AsyncMock(
|
||||
return_value=mock_page
|
||||
)
|
||||
mock_browser_state.init_browser = AsyncMock()
|
||||
|
||||
tool = ExtractTool()
|
||||
result = await tool._async_run("extract the page title")
|
||||
|
||||
# Result should be JSON string
|
||||
parsed_result = json.loads(result)
|
||||
assert parsed_result["title"] == "Test Page"
|
||||
assert parsed_result["content"] == "Test content"
|
||||
|
||||
@patch("examples.stagehand.stagehand_tools_agent.browser_state")
|
||||
async def test_observe_tool(self, mock_browser_state):
|
||||
"""Test ObserveTool functionality."""
|
||||
from examples.stagehand.stagehand_tools_agent import (
|
||||
ObserveTool,
|
||||
)
|
||||
|
||||
# Setup mock
|
||||
mock_page = AsyncMock()
|
||||
mock_observations = [
|
||||
MockObserveResult("Search input", "#search"),
|
||||
MockObserveResult("Submit button", "#submit"),
|
||||
]
|
||||
mock_page.observe = AsyncMock(return_value=mock_observations)
|
||||
mock_browser_state.get_page = AsyncMock(
|
||||
return_value=mock_page
|
||||
)
|
||||
mock_browser_state.init_browser = AsyncMock()
|
||||
|
||||
tool = ObserveTool()
|
||||
result = await tool._async_run("find the search box")
|
||||
|
||||
# Result should be JSON string
|
||||
parsed_result = json.loads(result)
|
||||
assert len(parsed_result) == 2
|
||||
assert parsed_result[0]["description"] == "Search input"
|
||||
assert parsed_result[0]["selector"] == "#search"
|
||||
|
||||
|
||||
# Test MCP integration
|
||||
class TestStagehandMCP:
|
||||
"""Test Stagehand MCP server integration."""
|
||||
|
||||
def test_mcp_agent_initialization(self):
|
||||
"""Test that MCP agent initializes with correct parameters."""
|
||||
from examples.stagehand.stagehand_mcp_agent import (
|
||||
StagehandMCPAgent,
|
||||
)
|
||||
|
||||
mcp_agent = StagehandMCPAgent(
|
||||
agent_name="TestMCPAgent",
|
||||
mcp_server_url="http://localhost:3000/sse",
|
||||
model_name="gpt-4o-mini",
|
||||
)
|
||||
|
||||
assert mcp_agent.agent.agent_name == "TestMCPAgent"
|
||||
assert mcp_agent.agent.mcp_url == "http://localhost:3000/sse"
|
||||
assert mcp_agent.agent.model_name == "gpt-4o-mini"
|
||||
|
||||
def test_multi_session_swarm_creation(self):
|
||||
"""Test multi-session browser swarm creation."""
|
||||
from examples.stagehand.stagehand_mcp_agent import (
|
||||
MultiSessionBrowserSwarm,
|
||||
)
|
||||
|
||||
swarm = MultiSessionBrowserSwarm(
|
||||
mcp_server_url="http://localhost:3000/sse",
|
||||
num_agents=3,
|
||||
)
|
||||
|
||||
assert len(swarm.agents) == 3
|
||||
assert swarm.agents[0].agent_name == "DataExtractor_0"
|
||||
assert swarm.agents[1].agent_name == "FormFiller_1"
|
||||
assert swarm.agents[2].agent_name == "WebMonitor_2"
|
||||
|
||||
@patch("swarms.Agent.run")
|
||||
def test_task_distribution(self, mock_run):
|
||||
"""Test task distribution among swarm agents."""
|
||||
from examples.stagehand.stagehand_mcp_agent import (
|
||||
MultiSessionBrowserSwarm,
|
||||
)
|
||||
|
||||
mock_run.return_value = "Task completed"
|
||||
|
||||
swarm = MultiSessionBrowserSwarm(num_agents=2)
|
||||
tasks = ["Task 1", "Task 2", "Task 3"]
|
||||
|
||||
results = swarm.distribute_tasks(tasks)
|
||||
|
||||
assert len(results) == 3
|
||||
assert all(result == "Task completed" for result in results)
|
||||
assert mock_run.call_count == 3
|
||||
|
||||
|
||||
# Test multi-agent workflows
|
||||
class TestMultiAgentWorkflows:
|
||||
"""Test multi-agent workflow configurations."""
|
||||
|
||||
@patch(
|
||||
"examples.stagehand.stagehand_wrapper_agent.Stagehand",
|
||||
MockStagehand,
|
||||
)
|
||||
def test_price_comparison_workflow_creation(self):
|
||||
"""Test creation of price comparison workflow."""
|
||||
from examples.stagehand.stagehand_multi_agent_workflow import (
|
||||
create_price_comparison_workflow,
|
||||
)
|
||||
|
||||
workflow = create_price_comparison_workflow()
|
||||
|
||||
# Should be a SequentialWorkflow with 2 agents
|
||||
assert len(workflow.agents) == 2
|
||||
# First agent should be a ConcurrentWorkflow
|
||||
assert hasattr(workflow.agents[0], "agents")
|
||||
# Second agent should be the analysis agent
|
||||
assert workflow.agents[1].agent_name == "PriceAnalysisAgent"
|
||||
|
||||
@patch(
|
||||
"examples.stagehand.stagehand_wrapper_agent.Stagehand",
|
||||
MockStagehand,
|
||||
)
|
||||
def test_competitive_analysis_workflow_creation(self):
|
||||
"""Test creation of competitive analysis workflow."""
|
||||
from examples.stagehand.stagehand_multi_agent_workflow import (
|
||||
create_competitive_analysis_workflow,
|
||||
)
|
||||
|
||||
workflow = create_competitive_analysis_workflow()
|
||||
|
||||
# Should have 3 agents in the rearrange pattern
|
||||
assert len(workflow.agents) == 3
|
||||
assert (
|
||||
workflow.flow
|
||||
== "company_researcher -> social_media_agent -> report_compiler"
|
||||
)
|
||||
|
||||
@patch(
|
||||
"examples.stagehand.stagehand_wrapper_agent.Stagehand",
|
||||
MockStagehand,
|
||||
)
|
||||
def test_automated_testing_workflow_creation(self):
|
||||
"""Test creation of automated testing workflow."""
|
||||
from examples.stagehand.stagehand_multi_agent_workflow import (
|
||||
create_automated_testing_workflow,
|
||||
)
|
||||
|
||||
workflow = create_automated_testing_workflow()
|
||||
|
||||
# Should be a SequentialWorkflow
|
||||
assert len(workflow.agents) == 2
|
||||
# First should be concurrent testing
|
||||
assert hasattr(workflow.agents[0], "agents")
|
||||
assert (
|
||||
len(workflow.agents[0].agents) == 3
|
||||
) # UI, Form, Accessibility testers
|
||||
|
||||
@patch(
|
||||
"examples.stagehand.stagehand_wrapper_agent.Stagehand",
|
||||
MockStagehand,
|
||||
)
|
||||
def test_news_aggregation_workflow_creation(self):
|
||||
"""Test creation of news aggregation workflow."""
|
||||
from examples.stagehand.stagehand_multi_agent_workflow import (
|
||||
create_news_aggregation_workflow,
|
||||
)
|
||||
|
||||
workflow = create_news_aggregation_workflow()
|
||||
|
||||
# Should be a SequentialWorkflow with 3 stages
|
||||
assert len(workflow.agents) == 3
|
||||
# First stage should be concurrent scrapers
|
||||
assert hasattr(workflow.agents[0], "agents")
|
||||
assert len(workflow.agents[0].agents) == 3 # 3 news sources
|
||||
|
||||
|
||||
# Integration tests
|
||||
class TestIntegration:
|
||||
"""End-to-end integration tests."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@patch(
|
||||
"examples.stagehand.stagehand_wrapper_agent.Stagehand",
|
||||
MockStagehand,
|
||||
)
|
||||
async def test_full_browser_automation_flow(self):
|
||||
"""Test a complete browser automation flow."""
|
||||
from examples.stagehand.stagehand_wrapper_agent import (
|
||||
StagehandAgent,
|
||||
)
|
||||
|
||||
agent = StagehandAgent(
|
||||
agent_name="IntegrationTestAgent",
|
||||
model_name="gpt-4o-mini",
|
||||
env="LOCAL",
|
||||
)
|
||||
|
||||
# Test navigation
|
||||
nav_result = agent.run("Navigate to example.com")
|
||||
assert "navigated_to" in nav_result
|
||||
|
||||
# Test extraction
|
||||
extract_result = agent.run("Extract all text from the page")
|
||||
assert "extracted" in extract_result
|
||||
|
||||
# Test observation
|
||||
observe_result = agent.run("Find all buttons on the page")
|
||||
assert "observation" in observe_result
|
||||
|
||||
# Cleanup
|
||||
agent.cleanup()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
@ -0,0 +1,302 @@
|
||||
"""
|
||||
Simple tests for Stagehand Integration with Swarms
|
||||
=================================================
|
||||
|
||||
These tests verify the basic structure and functionality of the
|
||||
Stagehand integration without requiring external dependencies.
|
||||
"""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
|
||||
class TestStagehandIntegrationStructure:
|
||||
"""Test that integration files have correct structure."""
|
||||
|
||||
def test_examples_directory_exists(self):
|
||||
"""Test that examples directory structure is correct."""
|
||||
import os
|
||||
|
||||
base_path = "examples/stagehand"
|
||||
assert os.path.exists(base_path)
|
||||
|
||||
expected_files = [
|
||||
"1_stagehand_wrapper_agent.py",
|
||||
"2_stagehand_tools_agent.py",
|
||||
"3_stagehand_mcp_agent.py",
|
||||
"4_stagehand_multi_agent_workflow.py",
|
||||
"README.md",
|
||||
"requirements.txt",
|
||||
]
|
||||
|
||||
for file in expected_files:
|
||||
file_path = os.path.join(base_path, file)
|
||||
assert os.path.exists(file_path), f"Missing file: {file}"
|
||||
|
||||
def test_wrapper_agent_imports(self):
|
||||
"""Test that wrapper agent has correct imports."""
|
||||
with open(
|
||||
"examples/stagehand/1_stagehand_wrapper_agent.py", "r"
|
||||
) as f:
|
||||
content = f.read()
|
||||
|
||||
# Check for required imports
|
||||
assert "from swarms import Agent" in content
|
||||
assert "import asyncio" in content
|
||||
assert "import json" in content
|
||||
assert "class StagehandAgent" in content
|
||||
|
||||
def test_tools_agent_imports(self):
|
||||
"""Test that tools agent has correct imports."""
|
||||
with open(
|
||||
"examples/stagehand/2_stagehand_tools_agent.py", "r"
|
||||
) as f:
|
||||
content = f.read()
|
||||
|
||||
# Check for required imports
|
||||
assert "from swarms import Agent" in content
|
||||
assert "def navigate_browser" in content
|
||||
assert "def browser_act" in content
|
||||
assert "def browser_extract" in content
|
||||
|
||||
def test_mcp_agent_imports(self):
|
||||
"""Test that MCP agent has correct imports."""
|
||||
with open(
|
||||
"examples/stagehand/3_stagehand_mcp_agent.py", "r"
|
||||
) as f:
|
||||
content = f.read()
|
||||
|
||||
# Check for required imports
|
||||
assert "from swarms import Agent" in content
|
||||
assert "class StagehandMCPAgent" in content
|
||||
assert "mcp_url" in content
|
||||
|
||||
def test_workflow_agent_imports(self):
|
||||
"""Test that workflow agent has correct imports."""
|
||||
with open(
|
||||
"examples/stagehand/4_stagehand_multi_agent_workflow.py",
|
||||
"r",
|
||||
) as f:
|
||||
content = f.read()
|
||||
|
||||
# Check for required imports
|
||||
assert (
|
||||
"from swarms import Agent, SequentialWorkflow, ConcurrentWorkflow"
|
||||
in content
|
||||
)
|
||||
assert (
|
||||
"from swarms.structs.agent_rearrange import AgentRearrange"
|
||||
in content
|
||||
)
|
||||
|
||||
|
||||
class TestStagehandMockIntegration:
|
||||
"""Test Stagehand integration with mocked dependencies."""
|
||||
|
||||
def test_mock_stagehand_initialization(self):
|
||||
"""Test that Stagehand can be mocked and initialized."""
|
||||
|
||||
# Setup mock without importing actual stagehand
|
||||
mock_stagehand = MagicMock()
|
||||
mock_instance = MagicMock()
|
||||
mock_instance.init = MagicMock()
|
||||
mock_stagehand.return_value = mock_instance
|
||||
|
||||
# Mock config creation
|
||||
config = MagicMock()
|
||||
stagehand_instance = mock_stagehand(config)
|
||||
|
||||
# Verify mock works
|
||||
assert stagehand_instance is not None
|
||||
assert hasattr(stagehand_instance, "init")
|
||||
|
||||
def test_json_serialization(self):
|
||||
"""Test JSON serialization for agent responses."""
|
||||
|
||||
# Test data that would come from browser automation
|
||||
test_data = {
|
||||
"task": "Navigate to example.com",
|
||||
"status": "completed",
|
||||
"data": {
|
||||
"navigated_to": "https://example.com",
|
||||
"extracted": ["item1", "item2"],
|
||||
"action": "navigate",
|
||||
},
|
||||
}
|
||||
|
||||
# Test serialization
|
||||
json_result = json.dumps(test_data, indent=2)
|
||||
assert isinstance(json_result, str)
|
||||
|
||||
# Test deserialization
|
||||
parsed_data = json.loads(json_result)
|
||||
assert parsed_data["task"] == "Navigate to example.com"
|
||||
assert parsed_data["status"] == "completed"
|
||||
assert len(parsed_data["data"]["extracted"]) == 2
|
||||
|
||||
def test_url_extraction_logic(self):
|
||||
"""Test URL extraction logic from task strings."""
|
||||
import re
|
||||
|
||||
# Test cases
|
||||
test_cases = [
|
||||
(
|
||||
"Navigate to https://example.com",
|
||||
["https://example.com"],
|
||||
),
|
||||
("Go to google.com and search", ["google.com"]),
|
||||
(
|
||||
"Visit https://github.com/repo",
|
||||
["https://github.com/repo"],
|
||||
),
|
||||
("Open example.org", ["example.org"]),
|
||||
]
|
||||
|
||||
url_pattern = r"https?://[^\s]+"
|
||||
domain_pattern = r"(\w+\.\w+)"
|
||||
|
||||
for task, expected in test_cases:
|
||||
# Extract full URLs
|
||||
urls = re.findall(url_pattern, task)
|
||||
|
||||
# If no full URLs, extract domains
|
||||
if not urls:
|
||||
domains = re.findall(domain_pattern, task)
|
||||
if domains:
|
||||
urls = domains
|
||||
|
||||
assert (
|
||||
len(urls) > 0
|
||||
), f"Failed to extract URL from: {task}"
|
||||
assert (
|
||||
urls[0] in expected
|
||||
), f"Expected {expected}, got {urls}"
|
||||
|
||||
|
||||
class TestSwarmsPatternsCompliance:
|
||||
"""Test compliance with Swarms framework patterns."""
|
||||
|
||||
def test_agent_inheritance_pattern(self):
|
||||
"""Test that wrapper agent follows Swarms Agent inheritance pattern."""
|
||||
|
||||
# Read the wrapper agent file
|
||||
with open(
|
||||
"examples/stagehand/1_stagehand_wrapper_agent.py", "r"
|
||||
) as f:
|
||||
content = f.read()
|
||||
|
||||
# Check inheritance pattern
|
||||
assert "class StagehandAgent(SwarmsAgent):" in content
|
||||
assert "def run(self, task: str" in content
|
||||
assert "return" in content
|
||||
|
||||
def test_tools_pattern(self):
|
||||
"""Test that tools follow Swarms function-based pattern."""
|
||||
|
||||
# Read the tools agent file
|
||||
with open(
|
||||
"examples/stagehand/2_stagehand_tools_agent.py", "r"
|
||||
) as f:
|
||||
content = f.read()
|
||||
|
||||
# Check function-based tool pattern
|
||||
assert "def navigate_browser(url: str) -> str:" in content
|
||||
assert "def browser_act(action: str) -> str:" in content
|
||||
assert "def browser_extract(query: str) -> str:" in content
|
||||
assert "def browser_observe(query: str) -> str:" in content
|
||||
|
||||
def test_mcp_integration_pattern(self):
|
||||
"""Test MCP integration follows Swarms pattern."""
|
||||
|
||||
# Read the MCP agent file
|
||||
with open(
|
||||
"examples/stagehand/3_stagehand_mcp_agent.py", "r"
|
||||
) as f:
|
||||
content = f.read()
|
||||
|
||||
# Check MCP pattern
|
||||
assert "mcp_url=" in content
|
||||
assert "Agent(" in content
|
||||
|
||||
def test_workflow_patterns(self):
|
||||
"""Test workflow patterns are properly used."""
|
||||
|
||||
# Read the workflow file
|
||||
with open(
|
||||
"examples/stagehand/4_stagehand_multi_agent_workflow.py",
|
||||
"r",
|
||||
) as f:
|
||||
content = f.read()
|
||||
|
||||
# Check workflow patterns
|
||||
assert "SequentialWorkflow" in content
|
||||
assert "ConcurrentWorkflow" in content
|
||||
assert "AgentRearrange" in content
|
||||
|
||||
|
||||
class TestDocumentationAndExamples:
|
||||
"""Test documentation and example completeness."""
|
||||
|
||||
def test_readme_completeness(self):
|
||||
"""Test that README contains essential information."""
|
||||
|
||||
with open("examples/stagehand/README.md", "r") as f:
|
||||
content = f.read()
|
||||
|
||||
required_sections = [
|
||||
"# Stagehand Browser Automation Integration",
|
||||
"## Overview",
|
||||
"## Examples",
|
||||
"## Setup",
|
||||
"## Use Cases",
|
||||
"## Best Practices",
|
||||
]
|
||||
|
||||
for section in required_sections:
|
||||
assert section in content, f"Missing section: {section}"
|
||||
|
||||
def test_requirements_file(self):
|
||||
"""Test that requirements file has necessary dependencies."""
|
||||
|
||||
with open("examples/stagehand/requirements.txt", "r") as f:
|
||||
content = f.read()
|
||||
|
||||
required_deps = [
|
||||
"swarms",
|
||||
"stagehand",
|
||||
"python-dotenv",
|
||||
"pydantic",
|
||||
"loguru",
|
||||
]
|
||||
|
||||
for dep in required_deps:
|
||||
assert dep in content, f"Missing dependency: {dep}"
|
||||
|
||||
def test_example_files_have_docstrings(self):
|
||||
"""Test that example files have proper docstrings."""
|
||||
|
||||
example_files = [
|
||||
"examples/stagehand/1_stagehand_wrapper_agent.py",
|
||||
"examples/stagehand/2_stagehand_tools_agent.py",
|
||||
"examples/stagehand/3_stagehand_mcp_agent.py",
|
||||
"examples/stagehand/4_stagehand_multi_agent_workflow.py",
|
||||
]
|
||||
|
||||
for file_path in example_files:
|
||||
with open(file_path, "r") as f:
|
||||
content = f.read()
|
||||
|
||||
# Check for module docstring
|
||||
assert (
|
||||
'"""' in content[:500]
|
||||
), f"Missing docstring in {file_path}"
|
||||
|
||||
# Check for main execution block
|
||||
assert (
|
||||
'if __name__ == "__main__":' in content
|
||||
), f"Missing main block in {file_path}"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
@ -0,0 +1,24 @@
|
||||
from swarms import HierarchicalSwarm, Agent
|
||||
|
||||
# Create agents
|
||||
research_agent = Agent(
|
||||
agent_name="Research-Analyst", model_name="gpt-4.1", print_on=True
|
||||
)
|
||||
analysis_agent = Agent(
|
||||
agent_name="Data-Analyst", model_name="gpt-4.1", print_on=True
|
||||
)
|
||||
|
||||
# Create swarm with interactive dashboard
|
||||
swarm = HierarchicalSwarm(
|
||||
agents=[research_agent, analysis_agent],
|
||||
max_loops=1,
|
||||
interactive=True, # Enable the Arasaka dashboard
|
||||
# director_reasoning_enabled=False,
|
||||
# director_reasoning_model_name="groq/moonshotai/kimi-k2-instruct",
|
||||
multi_agent_prompt_improvements=True,
|
||||
)
|
||||
|
||||
# Run swarm (task will be prompted interactively)
|
||||
result = swarm.run("what are the best nanomachine research papers?")
|
||||
|
||||
print(result)
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in new issue