Merge branch 'kyegomez:master' into frames

pull/1003/head
CI-DEV 2 weeks ago committed by GitHub
commit 0e6c011bd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -17,7 +17,7 @@ jobs:
&& ${{ contains(github.event.pull_request.labels.*.name, 'release') }}
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Install poetry
run: pipx install poetry==$POETRY_VERSION
- name: Set up Python 3.9

@ -21,7 +21,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v5
# Execute Codacy Analysis CLI and generate a SARIF output with the security issues identified during the analysis
- name: Run Codacy Analysis CLI
uses: codacy/codacy-analysis-cli-action@562ee3e92b8e92df8b67e0a5ff8aa8e261919c08

@ -16,7 +16,7 @@ jobs:
steps:
# Step 1: Check out the repository
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v5
# Step 2: Set up Python
- name: Set up Python ${{ matrix.python-version }}

@ -28,7 +28,7 @@ jobs:
language: ["python"]
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:

@ -28,7 +28,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: 'Checkout repository'
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: 'Dependency Review'
uses: actions/dependency-review-action@v4
# Commonly enabled options, see https://github.com/actions/dependency-review-action#configuration-options for all available options.

@ -9,7 +9,7 @@ jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- uses: actions/setup-python@v5
with:
python-version: 3.11

@ -6,7 +6,7 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v5

@ -33,7 +33,7 @@ jobs:
security-events: write
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
submodules: true

@ -35,7 +35,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
with:
submodules: true

@ -21,7 +21,7 @@ jobs:
python-version: ["3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:

@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Set up Python 3.10
uses: actions/setup-python@v5

@ -27,7 +27,7 @@ jobs:
runs-on: "ubuntu-20.04"
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Build an image from Dockerfile
run: |

@ -1,8 +1,6 @@
# Smart Database Powered by Hierarchical Multi-Agent Workflow
This module implements a fully autonomous database management system using a hierarchical
multi-agent architecture. The system includes specialized agents for different database
operations coordinated by a Database Director agent.
This module implements a fully autonomous database management system using a hierarchical multi-agent architecture. The system includes specialized agents for different database operations coordinated by a Database Director agent.
## Features

@ -14643,7 +14643,7 @@ The following example showcases how to use the `AgentRearrange` class to manage
```python
from swarms.structs.agent import Agent
from swarms.structs.rearrange import AgentRearrange
from swarms.structs.agent_rearrange import AgentRearrange
# Initialize the Director agent using Anthropic model via model_name
director = Agent(
@ -44327,7 +44327,7 @@ The flow pattern uses arrow notation (`->`) to define execution order:
### Basic Sequential Flow
```python
from swarms.structs.swarm_arange import SwarmRearrange
from swarms.structs.swarm_rearrange import SwarmRearrange
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat

@ -49,7 +49,7 @@ The following example showcases how to use the `AgentRearrange` class to manage
```python
from swarms.structs.agent import Agent
from swarms.structs.rearrange import AgentRearrange
from swarms.structs.agent_rearrange import AgentRearrange
# Initialize the Director agent using Anthropic model via model_name
director = Agent(

@ -46,7 +46,7 @@ The flow pattern uses arrow notation (`->`) to define execution order:
### Basic Sequential Flow
```python
from swarms.structs.swarm_arange import SwarmRearrange
from swarms.structs.swarm_rearrange import SwarmRearrange
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat

@ -1,5 +1,10 @@
from swarms import Agent
import litellm
litellm._turn_on_debug() # 👈 this is the 1-line change you need to make
# Initialize the agent
agent = Agent(
agent_name="Quantitative-Trading-Agent",

@ -23,6 +23,9 @@ from loguru import logger
from swarms import Agent, HierarchicalSwarm
from dotenv import load_dotenv
load_dotenv()
# =============================================================================
# DATABASE TOOLS - Core Functions for Database Operations
@ -901,6 +904,7 @@ smart_database_swarm = HierarchicalSwarm(
description="A comprehensive database management system with specialized agents for creation, schema management, data operations, and querying, coordinated by a database director",
director_model_name="gpt-4.1",
agents=database_specialists,
director_reasoning_enabled=False,
max_loops=1,
verbose=True,
)
@ -917,7 +921,8 @@ if __name__ == "__main__":
print("SMART DATABASE SWARM - E-COMMERCE SYSTEM EXAMPLE")
print("=" * 80)
task1 = """Create a comprehensive e-commerce database system with the following requirements:
task1 = """
Create a comprehensive e-commerce database system with the following requirements:
1. Create a database called 'ecommerce_db'
2. Create tables for:

@ -1,5 +1,5 @@
from swarms.structs.agent import Agent
from swarms.structs.council_judge import CouncilAsAJudge
from swarms.structs.council_as_judge import CouncilAsAJudge
# ========== USAGE EXAMPLE ==========

@ -46,7 +46,7 @@ technical_analyst = Agent(
)
# Create list of agents
agents = [market_researcher, financial_analyst, technical_analyst]
agents = [market_researcher, financial_analyst]
# Initialize the concurrent workflow
workflow = ConcurrentWorkflow(

@ -8,7 +8,7 @@ from loguru import logger
from tqdm import tqdm
from swarms.structs.agent import Agent
from swarms.structs.council_judge import CouncilAsAJudge
from swarms.structs.council_as_judge import CouncilAsAJudge
# Dataset configurations
DATASET_CONFIGS = {

@ -1,5 +1,5 @@
from swarms.structs.agent import Agent
from swarms.structs.council_judge import CouncilAsAJudge
from swarms.structs.council_as_judge import CouncilAsAJudge
if __name__ == "__main__":

@ -1,5 +1,5 @@
from swarms.structs.agent import Agent
from swarms.structs.council_judge import CouncilAsAJudge
from swarms.structs.council_as_judge import CouncilAsAJudge
if __name__ == "__main__":

@ -5,7 +5,7 @@ This example shows how to use the CouncilAsAJudge to evaluate various types
of responses including technical explanations, creative writing, and problem-solving.
"""
from swarms.structs.council_judge import CouncilAsAJudge
from swarms.structs.council_as_judge import CouncilAsAJudge
def evaluate_technical_response():

@ -5,7 +5,7 @@ This example shows how to use the CouncilAsAJudge with different output types,
custom worker configurations, and focused evaluation scenarios.
"""
from swarms.structs.council_judge import CouncilAsAJudge
from swarms.structs.council_as_judge import CouncilAsAJudge
def evaluate_with_final_output():

@ -6,7 +6,7 @@ across multiple dimensions including accuracy, helpfulness, harmlessness,
coherence, conciseness, and instruction adherence.
"""
from swarms.structs.council_judge import CouncilAsAJudge
from swarms.structs.council_as_judge import CouncilAsAJudge
def main():

@ -0,0 +1,51 @@
#!/usr/bin/env python3
"""
Basic Graph Workflow Example
A minimal example showing how to use GraphWorkflow with backend selection.
"""
from swarms.structs.graph_workflow import GraphWorkflow
from swarms.structs.agent import Agent
agent_one = Agent(agent_name="research_agent", model="gpt-4o-mini")
agent_two = Agent(
agent_name="research_agent_two", model="gpt-4o-mini"
)
agent_three = Agent(
agent_name="research_agent_three", model="gpt-4o-mini"
)
def main():
"""
Run a basic graph workflow example without print statements.
"""
# Create agents
# Create workflow with backend selection
workflow = GraphWorkflow(
name="Basic Example",
verbose=True,
)
# Add agents to workflow
workflow.add_node(agent_one)
workflow.add_node(agent_two)
workflow.add_node(agent_three)
# Create simple chain using the actual agent names
workflow.add_edge("research_agent", "research_agent_two")
workflow.add_edge("research_agent_two", "research_agent_three")
# Compile the workflow
workflow.compile()
# Run the workflow
task = "Complete a simple task"
results = workflow.run(task)
return results
if __name__ == "__main__":
main()

@ -1,19 +0,0 @@
from swarms.sims.senator_assembly import SenatorAssembly
def main():
"""
Simulate a Senate vote on a bill to invade Cuba and claim it as the 51st state.
This function initializes the SenatorAssembly and runs a concurrent vote simulation
on the specified bill.
"""
senator_simulation = SenatorAssembly()
senator_simulation.simulate_vote_concurrent(
"A bill proposing to deregulate the IPO (Initial Public Offering) market in the United States as extensively as possible. The bill seeks to remove or significantly reduce existing regulatory requirements and oversight for companies seeking to go public, with the aim of increasing market efficiency and access to capital. Senators must consider the potential economic, legal, and ethical consequences of such broad deregulation, and cast their votes accordingly.",
batch_size=10,
)
if __name__ == "__main__":
main()

@ -30,6 +30,7 @@ try:
WikipediaPersonalityScraper,
MEPPersonalityProfile,
)
WIKIPEDIA_PERSONALITY_AVAILABLE = True
except ImportError:
WIKIPEDIA_PERSONALITY_AVAILABLE = False
@ -52,4 +53,4 @@ __all__ = [
"WikipediaPersonalityScraper",
"MEPPersonalityProfile",
"WIKIPEDIA_PERSONALITY_AVAILABLE",
]
]

@ -5,25 +5,21 @@ This script demonstrates the comprehensive democratic functionality of the EuroS
including bill introduction, committee work, parliamentary debates, and democratic voting.
"""
import json
import time
from datetime import datetime
# Import directly from the file
from euroswarm_parliament import (
EuroSwarmParliament,
VoteType,
ParliamentaryRole,
ParliamentaryMember
)
def demonstrate_parliament_initialization():
"""Demonstrate parliament initialization and basic functionality with cost optimization."""
print("\nEUROSWARM PARLIAMENT INITIALIZATION DEMONSTRATION (COST OPTIMIZED)")
print(
"\nEUROSWARM PARLIAMENT INITIALIZATION DEMONSTRATION (COST OPTIMIZED)"
)
print("=" * 60)
# Initialize the parliament with cost optimization
parliament = EuroSwarmParliament(
eu_data_file="EU.xml",
@ -35,487 +31,632 @@ def demonstrate_parliament_initialization():
enable_caching=True, # NEW: Enable response caching
batch_size=25, # NEW: Batch size for concurrent execution
budget_limit=100.0, # NEW: Budget limit in dollars
verbose=True
verbose=True,
)
print(f"Parliament initialized with {len(parliament.meps)} MEPs")
# Show parliament composition with cost stats
composition = parliament.get_parliament_composition()
print(f"\nPARLIAMENT COMPOSITION:")
print("\nPARLIAMENT COMPOSITION:")
print(f"Total MEPs: {composition['total_meps']}")
print(f"Loaded MEPs: {composition['loaded_meps']} (lazy loading active)")
print(f"\nCOST OPTIMIZATION:")
cost_stats = composition['cost_stats']
print(f"Budget Limit: ${cost_stats['budget_remaining'] + cost_stats['total_cost']:.2f}")
print(
f"Loaded MEPs: {composition['loaded_meps']} (lazy loading active)"
)
print("\nCOST OPTIMIZATION:")
cost_stats = composition["cost_stats"]
print(
f"Budget Limit: ${cost_stats['budget_remaining'] + cost_stats['total_cost']:.2f}"
)
print(f"Budget Used: ${cost_stats['total_cost']:.2f}")
print(f"Budget Remaining: ${cost_stats['budget_remaining']:.2f}")
print(f"Cache Hit Rate: {cost_stats['cache_hit_rate']:.1%}")
print(f"\nPOLITICAL GROUP DISTRIBUTION:")
for group, data in composition['political_groups'].items():
count = data['count']
percentage = data['percentage']
print("\nPOLITICAL GROUP DISTRIBUTION:")
for group, data in composition["political_groups"].items():
count = data["count"]
percentage = data["percentage"]
print(f" {group}: {count} MEPs ({percentage:.1f}%)")
print(f"\nCOMMITTEE LEADERSHIP:")
for committee_name, committee_data in composition['committees'].items():
chair = committee_data['chair']
print("\nCOMMITTEE LEADERSHIP:")
for committee_name, committee_data in composition[
"committees"
].items():
chair = committee_data["chair"]
if chair:
print(f" {committee_name}: {chair}")
return parliament
def demonstrate_individual_mep_interaction(parliament):
"""Demonstrate individual MEP interaction and personality."""
print("\nINDIVIDUAL MEP INTERACTION DEMONSTRATION")
print("=" * 60)
# Get a sample MEP
sample_mep_name = list(parliament.meps.keys())[0]
sample_mep = parliament.meps[sample_mep_name]
print(f"Sample MEP: {sample_mep.full_name}")
print(f"Country: {sample_mep.country}")
print(f"Political Group: {sample_mep.political_group}")
print(f"National Party: {sample_mep.national_party}")
print(f"Committees: {', '.join(sample_mep.committees)}")
print(f"Expertise Areas: {', '.join(sample_mep.expertise_areas)}")
# Test MEP agent interaction
if sample_mep.agent:
test_prompt = "What are your views on European integration and how do you approach cross-border cooperation?"
print(f"\nMEP Response to: '{test_prompt}'")
print("-" * 50)
try:
response = sample_mep.agent.run(test_prompt)
print(response[:500] + "..." if len(response) > 500 else response)
print(
response[:500] + "..."
if len(response) > 500
else response
)
except Exception as e:
print(f"Error getting MEP response: {e}")
def demonstrate_committee_work(parliament):
"""Demonstrate committee work and hearings."""
print("\nCOMMITTEE WORK DEMONSTRATION")
print("=" * 60)
# Get a real MEP as sponsor
sponsor = list(parliament.meps.keys())[0]
# Create a test bill
bill = parliament.introduce_bill(
title="European Digital Rights and Privacy Protection Act",
description="Comprehensive legislation to strengthen digital rights, enhance privacy protection, and establish clear guidelines for data handling across the European Union.",
bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE,
committee="Legal Affairs",
sponsor=sponsor
sponsor=sponsor,
)
print(f"Bill: {bill.title}")
print(f"Committee: {bill.committee}")
print(f"Sponsor: {bill.sponsor}")
# Conduct committee hearing
print(f"\nCONDUCTING COMMITTEE HEARING...")
hearing_result = parliament.conduct_committee_hearing(bill.committee, bill)
print("\nCONDUCTING COMMITTEE HEARING...")
hearing_result = parliament.conduct_committee_hearing(
bill.committee, bill
)
print(f"Committee: {hearing_result['committee']}")
print(f"Participants: {len(hearing_result['participants'])} MEPs")
print(f"Recommendation: {hearing_result['recommendations']['recommendation']}")
print(f"Support: {hearing_result['recommendations']['support_percentage']:.1f}%")
print(f"Oppose: {hearing_result['recommendations']['oppose_percentage']:.1f}%")
print(f"Amend: {hearing_result['recommendations']['amend_percentage']:.1f}%")
print(
f"Recommendation: {hearing_result['recommendations']['recommendation']}"
)
print(
f"Support: {hearing_result['recommendations']['support_percentage']:.1f}%"
)
print(
f"Oppose: {hearing_result['recommendations']['oppose_percentage']:.1f}%"
)
print(
f"Amend: {hearing_result['recommendations']['amend_percentage']:.1f}%"
)
def demonstrate_parliamentary_debate(parliament):
"""Demonstrate parliamentary debate functionality."""
print("\nPARLIAMENTARY DEBATE DEMONSTRATION")
print("=" * 60)
# Get a real MEP as sponsor
sponsor = list(parliament.meps.keys())[1]
# Create a test bill
bill = parliament.introduce_bill(
title="European Green Deal Implementation Act",
description="Legislation to implement the European Green Deal, including carbon neutrality targets, renewable energy investments, and sustainable development measures.",
bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE,
committee="Environment, Public Health and Food Safety",
sponsor=sponsor
sponsor=sponsor,
)
print(f"Bill: {bill.title}")
print(f"Description: {bill.description}")
# Conduct parliamentary debate
print(f"\nCONDUCTING PARLIAMENTARY DEBATE...")
debate_result = parliament.conduct_parliamentary_debate(bill, max_speakers=10)
print(f"Debate Participants: {len(debate_result['participants'])} MEPs")
print(f"Debate Analysis:")
print(f" Support: {debate_result['analysis']['support_count']} speakers ({debate_result['analysis']['support_percentage']:.1f}%)")
print(f" Oppose: {debate_result['analysis']['oppose_count']} speakers ({debate_result['analysis']['oppose_percentage']:.1f}%)")
print(f" Neutral: {debate_result['analysis']['neutral_count']} speakers ({debate_result['analysis']['neutral_percentage']:.1f}%)")
print("\nCONDUCTING PARLIAMENTARY DEBATE...")
debate_result = parliament.conduct_parliamentary_debate(
bill, max_speakers=10
)
print(
f"Debate Participants: {len(debate_result['participants'])} MEPs"
)
print("Debate Analysis:")
print(
f" Support: {debate_result['analysis']['support_count']} speakers ({debate_result['analysis']['support_percentage']:.1f}%)"
)
print(
f" Oppose: {debate_result['analysis']['oppose_count']} speakers ({debate_result['analysis']['oppose_percentage']:.1f}%)"
)
print(
f" Neutral: {debate_result['analysis']['neutral_count']} speakers ({debate_result['analysis']['neutral_percentage']:.1f}%)"
)
def demonstrate_democratic_voting(parliament):
"""Demonstrate democratic voting functionality."""
print("\nDEMOCRATIC VOTING DEMONSTRATION")
print("=" * 60)
# Get a real MEP as sponsor
sponsor = list(parliament.meps.keys())[2]
# Create a test bill
bill = parliament.introduce_bill(
title="European Social Rights and Labor Protection Act",
description="Legislation to strengthen social rights, improve labor conditions, and ensure fair treatment of workers across the European Union.",
bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE,
committee="Employment and Social Affairs",
sponsor=sponsor
sponsor=sponsor,
)
print(f"Bill: {bill.title}")
print(f"Sponsor: {bill.sponsor}")
# Conduct democratic vote
print(f"\nCONDUCTING DEMOCRATIC VOTE...")
print("\nCONDUCTING DEMOCRATIC VOTE...")
vote_result = parliament.conduct_democratic_vote(bill)
# Calculate percentages
total_votes = vote_result.votes_for + vote_result.votes_against + vote_result.abstentions
in_favor_percentage = (vote_result.votes_for / total_votes * 100) if total_votes > 0 else 0
against_percentage = (vote_result.votes_against / total_votes * 100) if total_votes > 0 else 0
abstentions_percentage = (vote_result.abstentions / total_votes * 100) if total_votes > 0 else 0
print(f"Vote Results:")
total_votes = (
vote_result.votes_for
+ vote_result.votes_against
+ vote_result.abstentions
)
in_favor_percentage = (
(vote_result.votes_for / total_votes * 100)
if total_votes > 0
else 0
)
against_percentage = (
(vote_result.votes_against / total_votes * 100)
if total_votes > 0
else 0
)
abstentions_percentage = (
(vote_result.abstentions / total_votes * 100)
if total_votes > 0
else 0
)
print("Vote Results:")
print(f" Total Votes: {total_votes}")
print(f" In Favor: {vote_result.votes_for} ({in_favor_percentage:.1f}%)")
print(f" Against: {vote_result.votes_against} ({against_percentage:.1f}%)")
print(f" Abstentions: {vote_result.abstentions} ({abstentions_percentage:.1f}%)")
print(
f" In Favor: {vote_result.votes_for} ({in_favor_percentage:.1f}%)"
)
print(
f" Against: {vote_result.votes_against} ({against_percentage:.1f}%)"
)
print(
f" Abstentions: {vote_result.abstentions} ({abstentions_percentage:.1f}%)"
)
print(f" Result: {vote_result.result.value}")
# Show political group breakdown if available
if hasattr(vote_result, 'group_votes') and vote_result.group_votes:
print(f"\nPOLITICAL GROUP BREAKDOWN:")
if (
hasattr(vote_result, "group_votes")
and vote_result.group_votes
):
print("\nPOLITICAL GROUP BREAKDOWN:")
for group, votes in vote_result.group_votes.items():
print(f" {group}: {votes['in_favor']}/{votes['total']} in favor ({votes['percentage']:.1f}%)")
print(
f" {group}: {votes['in_favor']}/{votes['total']} in favor ({votes['percentage']:.1f}%)"
)
else:
print(f"\nIndividual votes recorded: {len(vote_result.individual_votes)} MEPs")
print(
f"\nIndividual votes recorded: {len(vote_result.individual_votes)} MEPs"
)
def demonstrate_complete_democratic_session(parliament):
"""Demonstrate a complete democratic parliamentary session."""
print("\nCOMPLETE DEMOCRATIC SESSION DEMONSTRATION")
print("=" * 60)
# Get a real MEP as sponsor
sponsor = list(parliament.meps.keys())[3]
# Run complete session
session_result = parliament.run_democratic_session(
bill_title="European Innovation and Technology Advancement Act",
bill_description="Comprehensive legislation to promote innovation, support technology startups, and establish Europe as a global leader in digital transformation and technological advancement.",
bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE,
committee="Industry, Research and Energy",
sponsor=sponsor
sponsor=sponsor,
)
print(f"Session Results:")
print("Session Results:")
print(f" Bill: {session_result['bill'].title}")
print(f" Committee Hearing: {session_result['hearing']['recommendations']['recommendation']}")
print(f" Debate Participants: {len(session_result['debate']['participants'])} MEPs")
print(
f" Committee Hearing: {session_result['hearing']['recommendations']['recommendation']}"
)
print(
f" Debate Participants: {len(session_result['debate']['participants'])} MEPs"
)
print(f" Final Vote: {session_result['vote']['result']}")
print(f" Vote Margin: {session_result['vote']['in_favor_percentage']:.1f}% in favor")
print(
f" Vote Margin: {session_result['vote']['in_favor_percentage']:.1f}% in favor"
)
def demonstrate_political_analysis(parliament):
"""Demonstrate political analysis and voting prediction."""
print("\nPOLITICAL ANALYSIS DEMONSTRATION")
print("=" * 60)
# Get a real MEP as sponsor
sponsor = list(parliament.meps.keys())[4]
# Create a test bill
bill = parliament.introduce_bill(
title="European Climate Action and Sustainability Act",
description="Comprehensive climate action legislation including carbon pricing, renewable energy targets, and sustainable development measures.",
bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE,
committee="Environment, Public Health and Food Safety",
sponsor=sponsor
sponsor=sponsor,
)
print(f"Bill: {bill.title}")
print(f"Sponsor: {bill.sponsor}")
# Analyze political landscape
analysis = parliament.analyze_political_landscape(bill)
print(f"\nPOLITICAL LANDSCAPE ANALYSIS:")
print("\nPOLITICAL LANDSCAPE ANALYSIS:")
print(f" Overall Support: {analysis['overall_support']:.1f}%")
print(f" Opposition: {analysis['opposition']:.1f}%")
print(f" Uncertainty: {analysis['uncertainty']:.1f}%")
print(f"\nPOLITICAL GROUP ANALYSIS:")
for group, data in analysis['group_analysis'].items():
print(f" {group}: {data['support']:.1f}% support, {data['opposition']:.1f}% opposition")
print("\nPOLITICAL GROUP ANALYSIS:")
for group, data in analysis["group_analysis"].items():
print(
f" {group}: {data['support']:.1f}% support, {data['opposition']:.1f}% opposition"
)
def demonstrate_hierarchical_democratic_voting(parliament):
"""Demonstrate hierarchical democratic voting with political group boards."""
print("\nHIERARCHICAL DEMOCRATIC VOTING DEMONSTRATION")
print("=" * 60)
# Get a real MEP as sponsor
sponsor = list(parliament.meps.keys())[5]
# Create a test bill
bill = parliament.introduce_bill(
title="European Climate Action and Sustainability Act",
description="Comprehensive climate action legislation including carbon pricing, renewable energy targets, and sustainable development measures.",
bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE,
committee="Environment, Public Health and Food Safety",
sponsor=sponsor
sponsor=sponsor,
)
print(f"Bill: {bill.title}")
print(f"Sponsor: {bill.sponsor}")
# Conduct hierarchical vote
print(f"\nCONDUCTING HIERARCHICAL DEMOCRATIC VOTE...")
hierarchical_result = parliament.conduct_hierarchical_democratic_vote(bill)
print(f"Hierarchical Vote Results:")
print("\nCONDUCTING HIERARCHICAL DEMOCRATIC VOTE...")
hierarchical_result = (
parliament.conduct_hierarchical_democratic_vote(bill)
)
print("Hierarchical Vote Results:")
print(f" Total Votes: {hierarchical_result['total_votes']}")
print(f" In Favor: {hierarchical_result['in_favor']} ({hierarchical_result['in_favor_percentage']:.1f}%)")
print(f" Against: {hierarchical_result['against']} ({hierarchical_result['against_percentage']:.1f}%)")
print(
f" In Favor: {hierarchical_result['in_favor']} ({hierarchical_result['in_favor_percentage']:.1f}%)"
)
print(
f" Against: {hierarchical_result['against']} ({hierarchical_result['against_percentage']:.1f}%)"
)
print(f" Result: {hierarchical_result['result']}")
print(f"\nPOLITICAL GROUP BOARD DECISIONS:")
for group, decision in hierarchical_result['group_decisions'].items():
print(f" {group}: {decision['decision']} ({decision['confidence']:.1f}% confidence)")
print("\nPOLITICAL GROUP BOARD DECISIONS:")
for group, decision in hierarchical_result[
"group_decisions"
].items():
print(
f" {group}: {decision['decision']} ({decision['confidence']:.1f}% confidence)"
)
def demonstrate_complete_hierarchical_session(parliament):
"""Demonstrate a complete hierarchical democratic session."""
print("\nCOMPLETE HIERARCHICAL DEMOCRATIC SESSION DEMONSTRATION")
print("=" * 60)
# Get a real MEP as sponsor
sponsor = list(parliament.meps.keys())[6]
# Run complete hierarchical session
session_result = parliament.run_hierarchical_democratic_session(
bill_title="European Climate Action and Sustainability Act",
bill_description="Comprehensive climate action legislation including carbon pricing, renewable energy targets, and sustainable development measures.",
bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE,
committee="Environment, Public Health and Food Safety",
sponsor=sponsor
sponsor=sponsor,
)
print(f"Hierarchical Session Results:")
print("Hierarchical Session Results:")
print(f" Bill: {session_result['bill'].title}")
print(f" Committee Hearing: {session_result['hearing']['recommendations']['recommendation']}")
print(f" Debate Participants: {len(session_result['debate']['participants'])} MEPs")
print(
f" Committee Hearing: {session_result['hearing']['recommendations']['recommendation']}"
)
print(
f" Debate Participants: {len(session_result['debate']['participants'])} MEPs"
)
print(f" Final Vote: {session_result['vote']['result']}")
print(f" Vote Margin: {session_result['vote']['in_favor_percentage']:.1f}% in favor")
print(
f" Vote Margin: {session_result['vote']['in_favor_percentage']:.1f}% in favor"
)
def demonstrate_wikipedia_personalities(parliament):
"""Demonstrate the Wikipedia personality system for realistic MEP behavior."""
print("\nWIKIPEDIA PERSONALITY SYSTEM DEMONSTRATION")
print("=" * 60)
# Check if Wikipedia personalities are available
if not parliament.enable_wikipedia_personalities:
print("Wikipedia personality system not available")
print("To enable: Install required dependencies and run Wikipedia scraper")
print(
"To enable: Install required dependencies and run Wikipedia scraper"
)
return
print(f"Wikipedia personality system enabled")
print(f"Loaded {len(parliament.personality_profiles)} personality profiles")
print("Wikipedia personality system enabled")
print(
f"Loaded {len(parliament.personality_profiles)} personality profiles"
)
# Show sample personality profiles
print(f"\nSAMPLE PERSONALITY PROFILES:")
print("\nSAMPLE PERSONALITY PROFILES:")
print("-" * 40)
sample_count = 0
for mep_name, profile in parliament.personality_profiles.items():
if sample_count >= 3: # Show only 3 samples
break
print(f"\n{mep_name}")
print(f" Wikipedia URL: {profile.wikipedia_url if profile.wikipedia_url else 'Not available'}")
print(f" Summary: {profile.summary[:200]}..." if profile.summary else "No summary available")
print(f" Political Views: {profile.political_views[:150]}..." if profile.political_views else "Based on party alignment")
print(f" Policy Focus: {profile.policy_focus[:150]}..." if profile.policy_focus else "General parliamentary work")
print(f" Achievements: {profile.achievements[:150]}..." if profile.achievements else "Parliamentary service")
print(
f" Wikipedia URL: {profile.wikipedia_url if profile.wikipedia_url else 'Not available'}"
)
print(
f" Summary: {profile.summary[:200]}..."
if profile.summary
else "No summary available"
)
print(
f" Political Views: {profile.political_views[:150]}..."
if profile.political_views
else "Based on party alignment"
)
print(
f" Policy Focus: {profile.policy_focus[:150]}..."
if profile.policy_focus
else "General parliamentary work"
)
print(
f" Achievements: {profile.achievements[:150]}..."
if profile.achievements
else "Parliamentary service"
)
print(f" Last Updated: {profile.last_updated}")
sample_count += 1
# Demonstrate personality-driven voting
print(f"\nPERSONALITY-DRIVEN VOTING DEMONSTRATION:")
print("\nPERSONALITY-DRIVEN VOTING DEMONSTRATION:")
print("-" * 50)
# Create a test bill that would trigger different personality responses
bill = parliament.introduce_bill(
title="European Climate Action and Green Technology Investment Act",
description="Comprehensive legislation to accelerate Europe's transition to renewable energy, including massive investments in green technology, carbon pricing mechanisms, and support for affected industries and workers.",
bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE,
committee="Environment",
sponsor="Climate Action Leader"
sponsor="Climate Action Leader",
)
print(f"Bill: {bill.title}")
print(f"Description: {bill.description}")
# Show how different MEPs with Wikipedia personalities would respond
print(f"\nPERSONALITY-BASED RESPONSES:")
print("\nPERSONALITY-BASED RESPONSES:")
print("-" * 40)
sample_meps = list(parliament.personality_profiles.keys())[:3]
for mep_name in sample_meps:
mep = parliament.meps.get(mep_name)
profile = parliament.personality_profiles.get(mep_name)
if mep and profile:
print(f"\n{mep_name} ({mep.political_group})")
# Show personality influence
if profile.political_views:
print(f" Political Views: {profile.political_views[:100]}...")
print(
f" Political Views: {profile.political_views[:100]}..."
)
if profile.policy_focus:
print(f" Policy Focus: {profile.policy_focus[:100]}...")
print(
f" Policy Focus: {profile.policy_focus[:100]}..."
)
# Predict voting behavior based on personality
if "environment" in profile.policy_focus.lower() or "climate" in profile.political_views.lower():
if (
"environment" in profile.policy_focus.lower()
or "climate" in profile.political_views.lower()
):
predicted_vote = "LIKELY SUPPORT"
reasoning = "Environmental policy focus and climate advocacy"
elif "economic" in profile.policy_focus.lower() or "business" in profile.political_views.lower():
reasoning = (
"Environmental policy focus and climate advocacy"
)
elif (
"economic" in profile.policy_focus.lower()
or "business" in profile.political_views.lower()
):
predicted_vote = "LIKELY OPPOSE"
reasoning = "Economic concerns about investment costs"
else:
predicted_vote = "UNCERTAIN"
reasoning = "Mixed considerations based on party alignment"
reasoning = (
"Mixed considerations based on party alignment"
)
print(f" Predicted Vote: {predicted_vote}")
print(f" Reasoning: {reasoning}")
# Demonstrate scraping functionality
print(f"\nWIKIPEDIA SCRAPING CAPABILITIES:")
print("\nWIKIPEDIA SCRAPING CAPABILITIES:")
print("-" * 50)
print("Can scrape Wikipedia data for all 717 MEPs")
print("Extracts political views, career history, and achievements")
print(
"Extracts political views, career history, and achievements"
)
print("Creates detailed personality profiles in JSON format")
print("Integrates real personality data into AI agent system prompts")
print(
"Integrates real personality data into AI agent system prompts"
)
print("Enables realistic, personality-driven voting behavior")
print("Respectful API usage with configurable delays")
print(f"\nTo scrape all MEP personalities:")
print("\nTo scrape all MEP personalities:")
print(" parliament.scrape_wikipedia_personalities(delay=1.0)")
print(" # This will create personality profiles for all 717 MEPs")
print(
" # This will create personality profiles for all 717 MEPs"
)
print(" # Profiles are saved in 'mep_personalities/' directory")
def demonstrate_optimized_parliamentary_session(parliament):
"""Demonstrate cost-optimized parliamentary session."""
print("\nCOST-OPTIMIZED PARLIAMENTARY SESSION DEMONSTRATION")
print("=" * 60)
# Run optimized session with cost limit
session_result = parliament.run_optimized_parliamentary_session(
bill_title="European Digital Rights and Privacy Protection Act",
bill_description="Comprehensive legislation to strengthen digital rights, enhance privacy protection, and establish clear guidelines for data handling across the European Union.",
bill_type=VoteType.ORDINARY_LEGISLATIVE_PROCEDURE,
committee="Legal Affairs",
max_cost=25.0 # Max $25 for this session
)
print(f"Session Results:")
print(f" Bill: {session_result['session_summary']['bill_title']}")
print(f" Final Outcome: {session_result['session_summary']['final_outcome']}")
print(f" Total Cost: ${session_result['session_summary']['total_cost']:.2f}")
print(f" Budget Remaining: ${session_result['cost_stats']['budget_remaining']:.2f}")
max_cost=25.0, # Max $25 for this session
)
print("Session Results:")
print(
f" Bill: {session_result['session_summary']['bill_title']}"
)
print(
f" Final Outcome: {session_result['session_summary']['final_outcome']}"
)
print(
f" Total Cost: ${session_result['session_summary']['total_cost']:.2f}"
)
print(
f" Budget Remaining: ${session_result['cost_stats']['budget_remaining']:.2f}"
)
# Show detailed cost statistics
cost_stats = parliament.get_cost_statistics()
print(f"\nDETAILED COST STATISTICS:")
print("\nDETAILED COST STATISTICS:")
print(f" Total Tokens Used: {cost_stats['total_tokens']:,}")
print(f" Requests Made: {cost_stats['requests_made']}")
print(f" Cache Hits: {cost_stats['cache_hits']}")
print(f" Cache Hit Rate: {cost_stats['cache_hit_rate']:.1%}")
print(f" Loading Efficiency: {cost_stats['loading_efficiency']:.1%}")
print(
f" Loading Efficiency: {cost_stats['loading_efficiency']:.1%}"
)
print(f" Cache Size: {cost_stats['cache_size']} entries")
return session_result
def main():
"""Main demonstration function."""
print("EUROSWARM PARLIAMENT - COST OPTIMIZED DEMONSTRATION")
print("=" * 60)
print("This demonstration shows the EuroSwarm Parliament with cost optimization features:")
print(
"This demonstration shows the EuroSwarm Parliament with cost optimization features:"
)
print("• Lazy loading of MEP agents (only create when needed)")
print("• Response caching (avoid repeated API calls)")
print("• Batch processing (control memory and cost)")
print("• Budget controls (hard limits on spending)")
print("• Cost tracking (real-time monitoring)")
# Initialize parliament with cost optimization
parliament = demonstrate_parliament_initialization()
# Demonstrate individual MEP interaction (will trigger lazy loading)
demonstrate_individual_mep_interaction(parliament)
# Demonstrate committee work with cost optimization
demonstrate_committee_work(parliament)
# Demonstrate parliamentary debate with cost optimization
demonstrate_parliamentary_debate(parliament)
# Demonstrate democratic voting with cost optimization
demonstrate_democratic_voting(parliament)
# Demonstrate political analysis with cost optimization
demonstrate_political_analysis(parliament)
# Demonstrate optimized parliamentary session
demonstrate_optimized_parliamentary_session(parliament)
# Show final cost statistics
final_stats = parliament.get_cost_statistics()
print(f"\nFINAL COST STATISTICS:")
print("\nFINAL COST STATISTICS:")
print(f"Total Cost: ${final_stats['total_cost']:.2f}")
print(f"Budget Remaining: ${final_stats['budget_remaining']:.2f}")
print(f"Cache Hit Rate: {final_stats['cache_hit_rate']:.1%}")
print(f"Loading Efficiency: {final_stats['loading_efficiency']:.1%}")
print(f"\n✅ COST OPTIMIZATION DEMONSTRATION COMPLETED!")
print(f"✅ EuroSwarm Parliament now supports cost-effective large-scale simulations")
print(f"✅ Lazy loading: {final_stats['loaded_meps']}/{final_stats['total_meps']} MEPs loaded")
print(
f"Loading Efficiency: {final_stats['loading_efficiency']:.1%}"
)
print("\n✅ COST OPTIMIZATION DEMONSTRATION COMPLETED!")
print(
"✅ EuroSwarm Parliament now supports cost-effective large-scale simulations"
)
print(
f"✅ Lazy loading: {final_stats['loaded_meps']}/{final_stats['total_meps']} MEPs loaded"
)
print(f"✅ Caching: {final_stats['cache_hit_rate']:.1%} hit rate")
print(f"✅ Budget control: ${final_stats['total_cost']:.2f} spent of ${final_stats['budget_remaining'] + final_stats['total_cost']:.2f} budget")
print(
f"✅ Budget control: ${final_stats['total_cost']:.2f} spent of ${final_stats['budget_remaining'] + final_stats['total_cost']:.2f} budget"
)
if __name__ == "__main__":
main()
main()

@ -5,63 +5,76 @@ Test script to verify mass agent template can process more than 500 agents.
from mass_agent_template import MassAgentTemplate
def test_mass_agents():
print("Testing Mass Agent Template - Processing More Than 50 Agents")
print(
"Testing Mass Agent Template - Processing More Than 50 Agents"
)
print("=" * 60)
# Initialize template with 200 agents
template = MassAgentTemplate(
agent_count=200,
budget_limit=50.0,
batch_size=25,
verbose=True
verbose=True,
)
print(f"Initialized with {len(template.agents)} agents")
print(f"Budget limit: ${template.cost_tracker.budget_limit}")
# Test processing 100 agents
print(f"\nTesting with 100 agents...")
print("\nTesting with 100 agents...")
result = template.run_mass_task(
"What is the most important skill for your role?",
agent_count=100
agent_count=100,
)
print(f"Results:")
print("Results:")
print(f" Agents processed: {len(result['agents_used'])}")
print(f" Cost: ${result['cost_stats']['total_cost']:.4f}")
print(f" Budget remaining: ${result['cost_stats']['budget_remaining']:.2f}")
print(
f" Budget remaining: ${result['cost_stats']['budget_remaining']:.2f}"
)
print(f" Cached: {result.get('cached', False)}")
# Test processing 150 agents
print(f"\nTesting with 150 agents...")
print("\nTesting with 150 agents...")
result2 = template.run_mass_task(
"Describe your approach to problem-solving",
agent_count=150
"Describe your approach to problem-solving", agent_count=150
)
print(f"Results:")
print("Results:")
print(f" Agents processed: {len(result2['agents_used'])}")
print(f" Cost: ${result2['cost_stats']['total_cost']:.4f}")
print(f" Budget remaining: ${result2['cost_stats']['budget_remaining']:.2f}")
print(
f" Budget remaining: ${result2['cost_stats']['budget_remaining']:.2f}"
)
print(f" Cached: {result2.get('cached', False)}")
# Show final stats
final_stats = template.get_system_stats()
print(f"\nFinal Statistics:")
print("\nFinal Statistics:")
print(f" Total agents: {final_stats['total_agents']}")
print(f" Loaded agents: {final_stats['loaded_agents']}")
print(f" Total cost: ${final_stats['cost_stats']['total_cost']:.4f}")
print(f" Budget remaining: ${final_stats['cost_stats']['budget_remaining']:.2f}")
print(
f" Total cost: ${final_stats['cost_stats']['total_cost']:.4f}"
)
print(
f" Budget remaining: ${final_stats['cost_stats']['budget_remaining']:.2f}"
)
# Success criteria
total_processed = len(result['agents_used']) + len(result2['agents_used'])
total_processed = len(result["agents_used"]) + len(
result2["agents_used"]
)
print(f"\nTotal agents processed: {total_processed}")
if total_processed > 50:
print("✅ SUCCESS: Template processed more than 50 agents!")
else:
print("❌ FAILURE: Template still limited to 50 agents")
if __name__ == "__main__":
test_mass_agents()
test_mass_agents()

@ -14,14 +14,13 @@ from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
import requests
from loguru import logger
import xml.etree.ElementTree as ET
@dataclass
class MEPPersonalityProfile:
"""
Comprehensive personality profile for an MEP based on Wikipedia data.
Attributes:
full_name: Full name of the MEP
mep_id: Unique MEP identifier
@ -47,7 +46,7 @@ class MEPPersonalityProfile:
social_media: Social media presence
last_updated: When the profile was last updated
"""
full_name: str
mep_id: str
wikipedia_url: Optional[str] = None
@ -77,11 +76,15 @@ class WikipediaPersonalityScraper:
"""
Scraper for gathering Wikipedia personality data for MEPs.
"""
def __init__(self, output_dir: str = "mep_personalities", verbose: bool = True):
def __init__(
self,
output_dir: str = "mep_personalities",
verbose: bool = True,
):
"""
Initialize the Wikipedia personality scraper.
Args:
output_dir: Directory to store personality profiles
verbose: Enable verbose logging
@ -89,61 +92,81 @@ class WikipediaPersonalityScraper:
self.output_dir = output_dir
self.verbose = verbose
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'EuroSwarm Parliament Personality Scraper/1.0 (https://github.com/swarms-democracy)'
})
self.session.headers.update(
{
"User-Agent": "EuroSwarm Parliament Personality Scraper/1.0 (https://github.com/swarms-democracy)"
}
)
# Create output directory
os.makedirs(output_dir, exist_ok=True)
if verbose:
logger.info(f"Wikipedia Personality Scraper initialized. Output directory: {output_dir}")
logger.info(
f"Wikipedia Personality Scraper initialized. Output directory: {output_dir}"
)
def extract_mep_data_from_xml(self, xml_file: str = "EU.xml") -> List[Dict[str, str]]:
def extract_mep_data_from_xml(
self, xml_file: str = "EU.xml"
) -> List[Dict[str, str]]:
"""
Extract MEP data from EU.xml file.
Args:
xml_file: Path to EU.xml file
Returns:
List of MEP data dictionaries
"""
meps = []
try:
with open(xml_file, 'r', encoding='utf-8') as f:
with open(xml_file, "r", encoding="utf-8") as f:
content = f.read()
# Use regex to extract MEP data
mep_pattern = r'<mep>\s*<fullName>(.*?)</fullName>\s*<country>(.*?)</country>\s*<politicalGroup>(.*?)</politicalGroup>\s*<id>(.*?)</id>\s*<nationalPoliticalGroup>(.*?)</nationalPoliticalGroup>\s*</mep>'
mep_pattern = r"<mep>\s*<fullName>(.*?)</fullName>\s*<country>(.*?)</country>\s*<politicalGroup>(.*?)</politicalGroup>\s*<id>(.*?)</id>\s*<nationalPoliticalGroup>(.*?)</nationalPoliticalGroup>\s*</mep>"
mep_matches = re.findall(mep_pattern, content, re.DOTALL)
for full_name, country, political_group, mep_id, national_party in mep_matches:
meps.append({
'full_name': full_name.strip(),
'country': country.strip(),
'political_group': political_group.strip(),
'mep_id': mep_id.strip(),
'national_party': national_party.strip()
})
for (
full_name,
country,
political_group,
mep_id,
national_party,
) in mep_matches:
meps.append(
{
"full_name": full_name.strip(),
"country": country.strip(),
"political_group": political_group.strip(),
"mep_id": mep_id.strip(),
"national_party": national_party.strip(),
}
)
if self.verbose:
logger.info(f"Extracted {len(meps)} MEPs from {xml_file}")
logger.info(
f"Extracted {len(meps)} MEPs from {xml_file}"
)
except Exception as e:
logger.error(f"Error extracting MEP data from {xml_file}: {e}")
logger.error(
f"Error extracting MEP data from {xml_file}: {e}"
)
return meps
def search_wikipedia_page(self, mep_name: str, country: str) -> Optional[str]:
def search_wikipedia_page(
self, mep_name: str, country: str
) -> Optional[str]:
"""
Search for a Wikipedia page for an MEP.
Args:
mep_name: Full name of the MEP
country: Country of the MEP
Returns:
Wikipedia page title if found, None otherwise
"""
@ -151,48 +174,56 @@ class WikipediaPersonalityScraper:
# Search for the MEP on Wikipedia
search_url = "https://en.wikipedia.org/w/api.php"
search_params = {
'action': 'query',
'format': 'json',
'list': 'search',
'srsearch': f'"{mep_name}" {country}',
'srlimit': 5,
'srnamespace': 0
"action": "query",
"format": "json",
"list": "search",
"srsearch": f'"{mep_name}" {country}',
"srlimit": 5,
"srnamespace": 0,
}
response = self.session.get(search_url, params=search_params)
response = self.session.get(
search_url, params=search_params
)
response.raise_for_status()
data = response.json()
search_results = data.get('query', {}).get('search', [])
search_results = data.get("query", {}).get("search", [])
if search_results:
# Return the first result
return search_results[0]['title']
return search_results[0]["title"]
# Try alternative search without quotes
search_params['srsearch'] = f'{mep_name} {country}'
response = self.session.get(search_url, params=search_params)
search_params["srsearch"] = f"{mep_name} {country}"
response = self.session.get(
search_url, params=search_params
)
response.raise_for_status()
data = response.json()
search_results = data.get('query', {}).get('search', [])
search_results = data.get("query", {}).get("search", [])
if search_results:
return search_results[0]['title']
return search_results[0]["title"]
except Exception as e:
if self.verbose:
logger.warning(f"Error searching Wikipedia for {mep_name}: {e}")
logger.warning(
f"Error searching Wikipedia for {mep_name}: {e}"
)
return None
def get_wikipedia_content(self, page_title: str) -> Optional[Dict[str, Any]]:
def get_wikipedia_content(
self, page_title: str
) -> Optional[Dict[str, Any]]:
"""
Get Wikipedia content for a specific page.
Args:
page_title: Wikipedia page title
Returns:
Dictionary containing page content and metadata
"""
@ -200,376 +231,451 @@ class WikipediaPersonalityScraper:
# Get page content
content_url = "https://en.wikipedia.org/w/api.php"
content_params = {
'action': 'query',
'format': 'json',
'titles': page_title,
'prop': 'extracts|info|categories',
'exintro': True,
'explaintext': True,
'inprop': 'url',
'cllimit': 50
"action": "query",
"format": "json",
"titles": page_title,
"prop": "extracts|info|categories",
"exintro": True,
"explaintext": True,
"inprop": "url",
"cllimit": 50,
}
response = self.session.get(content_url, params=content_params)
response = self.session.get(
content_url, params=content_params
)
response.raise_for_status()
data = response.json()
pages = data.get('query', {}).get('pages', {})
pages = data.get("query", {}).get("pages", {})
if pages:
page_id = list(pages.keys())[0]
page_data = pages[page_id]
return {
'title': page_data.get('title', ''),
'extract': page_data.get('extract', ''),
'url': page_data.get('fullurl', ''),
'categories': [cat['title'] for cat in page_data.get('categories', [])],
'pageid': page_data.get('pageid', ''),
'length': page_data.get('length', 0)
"title": page_data.get("title", ""),
"extract": page_data.get("extract", ""),
"url": page_data.get("fullurl", ""),
"categories": [
cat["title"]
for cat in page_data.get("categories", [])
],
"pageid": page_data.get("pageid", ""),
"length": page_data.get("length", 0),
}
except Exception as e:
if self.verbose:
logger.warning(f"Error getting Wikipedia content for {page_title}: {e}")
logger.warning(
f"Error getting Wikipedia content for {page_title}: {e}"
)
return None
def parse_wikipedia_content(self, content: str, mep_name: str) -> Dict[str, str]:
def parse_wikipedia_content(
self, content: str, mep_name: str
) -> Dict[str, str]:
"""
Parse Wikipedia content to extract structured personality information.
Args:
content: Raw Wikipedia content
mep_name: Name of the MEP
Returns:
Dictionary of parsed personality information
"""
personality_data = {
'summary': '',
'early_life': '',
'political_career': '',
'political_views': '',
'policy_focus': '',
'achievements': '',
'controversies': '',
'personal_life': '',
'education': '',
'professional_background': '',
'party_affiliations': '',
'committee_experience': '',
'voting_record': '',
'public_statements': '',
'interests': '',
'languages': '',
'awards': '',
'publications': '',
'social_media': ''
"summary": "",
"early_life": "",
"political_career": "",
"political_views": "",
"policy_focus": "",
"achievements": "",
"controversies": "",
"personal_life": "",
"education": "",
"professional_background": "",
"party_affiliations": "",
"committee_experience": "",
"voting_record": "",
"public_statements": "",
"interests": "",
"languages": "",
"awards": "",
"publications": "",
"social_media": "",
}
# Extract summary (first paragraph)
paragraphs = content.split('\n\n')
paragraphs = content.split("\n\n")
if paragraphs:
personality_data['summary'] = paragraphs[0][:1000] # Limit summary length
personality_data["summary"] = paragraphs[0][
:1000
] # Limit summary length
# Look for specific sections
content_lower = content.lower()
# Early life and education
early_life_patterns = [
r'early life[^.]*\.',
r'born[^.]*\.',
r'childhood[^.]*\.',
r'grew up[^.]*\.',
r'education[^.]*\.'
r"early life[^.]*\.",
r"born[^.]*\.",
r"childhood[^.]*\.",
r"grew up[^.]*\.",
r"education[^.]*\.",
]
for pattern in early_life_patterns:
matches = re.findall(pattern, content_lower, re.IGNORECASE)
matches = re.findall(
pattern, content_lower, re.IGNORECASE
)
if matches:
personality_data['early_life'] = ' '.join(matches[:3]) # Take first 3 matches
personality_data["early_life"] = " ".join(
matches[:3]
) # Take first 3 matches
break
# Political career
political_patterns = [
r'political career[^.]*\.',
r'elected[^.]*\.',
r'parliament[^.]*\.',
r'minister[^.]*\.',
r'party[^.]*\.'
r"political career[^.]*\.",
r"elected[^.]*\.",
r"parliament[^.]*\.",
r"minister[^.]*\.",
r"party[^.]*\.",
]
for pattern in political_patterns:
matches = re.findall(pattern, content_lower, re.IGNORECASE)
matches = re.findall(
pattern, content_lower, re.IGNORECASE
)
if matches:
personality_data['political_career'] = ' '.join(matches[:5]) # Take first 5 matches
personality_data["political_career"] = " ".join(
matches[:5]
) # Take first 5 matches
break
# Political views
views_patterns = [
r'political views[^.]*\.',
r'positions[^.]*\.',
r'advocates[^.]*\.',
r'supports[^.]*\.',
r'opposes[^.]*\.'
r"political views[^.]*\.",
r"positions[^.]*\.",
r"advocates[^.]*\.",
r"supports[^.]*\.",
r"opposes[^.]*\.",
]
for pattern in views_patterns:
matches = re.findall(pattern, content_lower, re.IGNORECASE)
matches = re.findall(
pattern, content_lower, re.IGNORECASE
)
if matches:
personality_data['political_views'] = ' '.join(matches[:3])
personality_data["political_views"] = " ".join(
matches[:3]
)
break
# Policy focus
policy_patterns = [
r'policy[^.]*\.',
r'focus[^.]*\.',
r'issues[^.]*\.',
r'legislation[^.]*\.'
r"policy[^.]*\.",
r"focus[^.]*\.",
r"issues[^.]*\.",
r"legislation[^.]*\.",
]
for pattern in policy_patterns:
matches = re.findall(pattern, content_lower, re.IGNORECASE)
matches = re.findall(
pattern, content_lower, re.IGNORECASE
)
if matches:
personality_data['policy_focus'] = ' '.join(matches[:3])
personality_data["policy_focus"] = " ".join(
matches[:3]
)
break
# Achievements
achievement_patterns = [
r'achievements[^.]*\.',
r'accomplishments[^.]*\.',
r'success[^.]*\.',
r'won[^.]*\.',
r'received[^.]*\.'
r"achievements[^.]*\.",
r"accomplishments[^.]*\.",
r"success[^.]*\.",
r"won[^.]*\.",
r"received[^.]*\.",
]
for pattern in achievement_patterns:
matches = re.findall(pattern, content_lower, re.IGNORECASE)
matches = re.findall(
pattern, content_lower, re.IGNORECASE
)
if matches:
personality_data['achievements'] = ' '.join(matches[:3])
personality_data["achievements"] = " ".join(
matches[:3]
)
break
return personality_data
def create_personality_profile(self, mep_data: Dict[str, str]) -> MEPPersonalityProfile:
def create_personality_profile(
self, mep_data: Dict[str, str]
) -> MEPPersonalityProfile:
"""
Create a personality profile for an MEP.
Args:
mep_data: MEP data from XML file
Returns:
MEPPersonalityProfile object
"""
mep_name = mep_data['full_name']
country = mep_data['country']
mep_name = mep_data["full_name"]
country = mep_data["country"]
# Search for Wikipedia page
page_title = self.search_wikipedia_page(mep_name, country)
if page_title:
# Get Wikipedia content
wiki_content = self.get_wikipedia_content(page_title)
if wiki_content:
# Parse content
personality_data = self.parse_wikipedia_content(wiki_content['extract'], mep_name)
personality_data = self.parse_wikipedia_content(
wiki_content["extract"], mep_name
)
# Create profile
profile = MEPPersonalityProfile(
full_name=mep_name,
mep_id=mep_data['mep_id'],
wikipedia_url=wiki_content['url'],
summary=personality_data['summary'],
early_life=personality_data['early_life'],
political_career=personality_data['political_career'],
political_views=personality_data['political_views'],
policy_focus=personality_data['policy_focus'],
achievements=personality_data['achievements'],
controversies=personality_data['controversies'],
personal_life=personality_data['personal_life'],
education=personality_data['education'],
professional_background=personality_data['professional_background'],
party_affiliations=personality_data['party_affiliations'],
committee_experience=personality_data['committee_experience'],
voting_record=personality_data['voting_record'],
public_statements=personality_data['public_statements'],
interests=personality_data['interests'],
languages=personality_data['languages'],
awards=personality_data['awards'],
publications=personality_data['publications'],
social_media=personality_data['social_media'],
last_updated=time.strftime("%Y-%m-%d %H:%M:%S")
mep_id=mep_data["mep_id"],
wikipedia_url=wiki_content["url"],
summary=personality_data["summary"],
early_life=personality_data["early_life"],
political_career=personality_data[
"political_career"
],
political_views=personality_data[
"political_views"
],
policy_focus=personality_data["policy_focus"],
achievements=personality_data["achievements"],
controversies=personality_data["controversies"],
personal_life=personality_data["personal_life"],
education=personality_data["education"],
professional_background=personality_data[
"professional_background"
],
party_affiliations=personality_data[
"party_affiliations"
],
committee_experience=personality_data[
"committee_experience"
],
voting_record=personality_data["voting_record"],
public_statements=personality_data[
"public_statements"
],
interests=personality_data["interests"],
languages=personality_data["languages"],
awards=personality_data["awards"],
publications=personality_data["publications"],
social_media=personality_data["social_media"],
last_updated=time.strftime("%Y-%m-%d %H:%M:%S"),
)
if self.verbose:
logger.info(f"Created personality profile for {mep_name} from Wikipedia")
logger.info(
f"Created personality profile for {mep_name} from Wikipedia"
)
return profile
# Create minimal profile if no Wikipedia data found
profile = MEPPersonalityProfile(
full_name=mep_name,
mep_id=mep_data['mep_id'],
mep_id=mep_data["mep_id"],
summary=f"{mep_name} is a Member of the European Parliament representing {country}.",
political_career=f"Currently serving as MEP for {country}.",
political_views=f"Member of {mep_data['political_group']} and {mep_data['national_party']}.",
last_updated=time.strftime("%Y-%m-%d %H:%M:%S")
last_updated=time.strftime("%Y-%m-%d %H:%M:%S"),
)
if self.verbose:
logger.warning(f"No Wikipedia data found for {mep_name}, created minimal profile")
logger.warning(
f"No Wikipedia data found for {mep_name}, created minimal profile"
)
return profile
def save_personality_profile(self, profile: MEPPersonalityProfile) -> str:
def save_personality_profile(
self, profile: MEPPersonalityProfile
) -> str:
"""
Save personality profile to JSON file.
Args:
profile: MEPPersonalityProfile object
Returns:
Path to saved file
"""
# Create safe filename
safe_name = re.sub(r'[^\w\s-]', '', profile.full_name).strip()
safe_name = re.sub(r'[-\s]+', '_', safe_name)
safe_name = re.sub(r"[^\w\s-]", "", profile.full_name).strip()
safe_name = re.sub(r"[-\s]+", "_", safe_name)
filename = f"{safe_name}_{profile.mep_id}.json"
filepath = os.path.join(self.output_dir, filename)
# Convert to dictionary and save
profile_dict = asdict(profile)
with open(filepath, 'w', encoding='utf-8') as f:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(profile_dict, f, indent=2, ensure_ascii=False)
if self.verbose:
logger.info(f"Saved personality profile: {filepath}")
return filepath
def scrape_all_mep_personalities(self, xml_file: str = "EU.xml", delay: float = 1.0) -> Dict[str, str]:
def scrape_all_mep_personalities(
self, xml_file: str = "EU.xml", delay: float = 1.0
) -> Dict[str, str]:
"""
Scrape personality data for all MEPs.
Args:
xml_file: Path to EU.xml file
delay: Delay between requests to be respectful to Wikipedia
Returns:
Dictionary mapping MEP names to their personality profile file paths
"""
meps = self.extract_mep_data_from_xml(xml_file)
profile_files = {}
if self.verbose:
logger.info(f"Starting personality scraping for {len(meps)} MEPs")
logger.info(
f"Starting personality scraping for {len(meps)} MEPs"
)
for i, mep_data in enumerate(meps, 1):
mep_name = mep_data['full_name']
mep_name = mep_data["full_name"]
if self.verbose:
logger.info(f"Processing {i}/{len(meps)}: {mep_name}")
try:
# Create personality profile
profile = self.create_personality_profile(mep_data)
# Save profile
filepath = self.save_personality_profile(profile)
profile_files[mep_name] = filepath
# Respectful delay
time.sleep(delay)
except Exception as e:
logger.error(f"Error processing {mep_name}: {e}")
continue
if self.verbose:
logger.info(f"Completed personality scraping. {len(profile_files)} profiles created.")
logger.info(
f"Completed personality scraping. {len(profile_files)} profiles created."
)
return profile_files
def load_personality_profile(self, filepath: str) -> MEPPersonalityProfile:
def load_personality_profile(
self, filepath: str
) -> MEPPersonalityProfile:
"""
Load personality profile from JSON file.
Args:
filepath: Path to personality profile JSON file
Returns:
MEPPersonalityProfile object
"""
with open(filepath, 'r', encoding='utf-8') as f:
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
return MEPPersonalityProfile(**data)
def get_personality_summary(self, profile: MEPPersonalityProfile) -> str:
def get_personality_summary(
self, profile: MEPPersonalityProfile
) -> str:
"""
Generate a personality summary for use in AI agent system prompts.
Args:
profile: MEPPersonalityProfile object
Returns:
Formatted personality summary
"""
summary_parts = []
if profile.summary:
summary_parts.append(f"Background: {profile.summary}")
if profile.political_career:
summary_parts.append(f"Political Career: {profile.political_career}")
summary_parts.append(
f"Political Career: {profile.political_career}"
)
if profile.political_views:
summary_parts.append(f"Political Views: {profile.political_views}")
summary_parts.append(
f"Political Views: {profile.political_views}"
)
if profile.policy_focus:
summary_parts.append(f"Policy Focus: {profile.policy_focus}")
summary_parts.append(
f"Policy Focus: {profile.policy_focus}"
)
if profile.achievements:
summary_parts.append(f"Notable Achievements: {profile.achievements}")
summary_parts.append(
f"Notable Achievements: {profile.achievements}"
)
if profile.education:
summary_parts.append(f"Education: {profile.education}")
if profile.professional_background:
summary_parts.append(f"Professional Background: {profile.professional_background}")
summary_parts.append(
f"Professional Background: {profile.professional_background}"
)
return "\n".join(summary_parts)
def main():
"""Main function to run the Wikipedia personality scraper."""
print("🏛️ WIKIPEDIA PERSONALITY SCRAPER FOR EUROSWARM PARLIAMENT")
print("=" * 70)
# Initialize scraper
scraper = WikipediaPersonalityScraper(output_dir="mep_personalities", verbose=True)
scraper = WikipediaPersonalityScraper(
output_dir="mep_personalities", verbose=True
)
# Scrape all MEP personalities
profile_files = scraper.scrape_all_mep_personalities(delay=1.0)
print(f"\n✅ Scraping completed!")
print("\n✅ Scraping completed!")
print(f"📁 Profiles saved to: {scraper.output_dir}")
print(f"📊 Total profiles created: {len(profile_files)}")
# Show sample profile
if profile_files:
sample_name = list(profile_files.keys())[0]
sample_file = profile_files[sample_name]
sample_profile = scraper.load_personality_profile(sample_file)
print(f"\n📋 Sample Profile: {sample_name}")
print("-" * 50)
print(scraper.get_personality_summary(sample_profile))
if __name__ == "__main__":
main()
main()

@ -1,170 +0,0 @@
from loguru import logger
from swarms.structs.swarm_eval import (
SwarmEvaluator,
PRESET_DATASETS,
)
import os
from swarms import Agent
from dotenv import load_dotenv
from swarm_models import OpenAIChat
load_dotenv()
model = OpenAIChat(
model_name="deepseek-ai/DeepSeek-R1-Distill-Llama-70B-free",
openai_api_key=os.getenv("TOGETHER_API_KEY"),
base_url="https://api.together.xyz/v1",
)
# Define system prompts for reasoning agents
THINKING_AGENT_PROMPT = """You are a sophisticated analytical and strategic thinking agent focused on deep problem analysis and solution design.
Your core capabilities include:
1. Comprehensive Problem Analysis
- Break down complex problems into constituent elements
- Map relationships and dependencies between components
- Identify root causes and underlying patterns
- Consider historical context and precedents
2. Multi-Perspective Evaluation
- Examine issues from multiple stakeholder viewpoints
- Consider short-term and long-term implications
- Evaluate social, economic, technical, and ethical dimensions
- Challenge assumptions and identify potential biases
3. Risk Assessment and Mitigation
- Conduct thorough risk analysis across scenarios
- Identify potential failure modes and edge cases
- Develop contingency plans and mitigation strategies
- Assess probability and impact of various outcomes
4. Strategic Solution Development
- Generate multiple solution approaches
- Evaluate trade-offs between different strategies
- Consider resource constraints and limitations
- Design scalable and sustainable solutions
5. Decision Framework Creation
- Establish clear evaluation criteria
- Weight competing priorities appropriately
- Create structured decision matrices
- Document reasoning and key decision factors
6. Systems Thinking
- Map interconnections between system elements
- Identify feedback loops and cascade effects
- Consider emergent properties and behaviors
- Account for dynamic system evolution
Your output should always include:
- Clear articulation of your analytical process
- Key assumptions and their justification
- Potential risks and mitigation strategies
- Multiple solution options with pros/cons
- Specific recommendations with supporting rationale
- Areas of uncertainty requiring further investigation
Focus on developing robust, well-reasoned strategies that account for complexity while remaining practical and actionable."""
ACTION_AGENT_PROMPT = """You are an advanced implementation and execution agent focused on turning strategic plans into concrete results.
Your core capabilities include:
1. Strategic Implementation Planning
- Break down high-level strategies into specific actions
- Create detailed project roadmaps and timelines
- Identify critical path dependencies
- Establish clear milestones and success metrics
- Design feedback and monitoring mechanisms
2. Resource Optimization
- Assess resource requirements and constraints
- Optimize resource allocation and scheduling
- Identify efficiency opportunities
- Plan for scalability and flexibility
- Manage competing priorities effectively
3. Execution Management
- Develop detailed implementation procedures
- Create clear operational guidelines
- Establish quality control measures
- Design progress tracking systems
- Build in review and adjustment points
4. Risk Management
- Implement specific risk mitigation measures
- Create early warning systems
- Develop contingency procedures
- Establish fallback positions
- Monitor risk indicators
5. Stakeholder Management
- Identify key stakeholders and their needs
- Create communication plans
- Establish feedback mechanisms
- Manage expectations effectively
- Build support and buy-in
6. Continuous Improvement
- Monitor implementation effectiveness
- Gather and analyze performance data
- Identify improvement opportunities
- Implement iterative enhancements
- Document lessons learned
Your output should always include:
- Detailed action plans with specific steps
- Resource requirements and allocation plans
- Timeline with key milestones
- Success metrics and monitoring approach
- Risk mitigation procedures
- Communication and stakeholder management plans
- Quality control measures
- Feedback and adjustment mechanisms
Focus on practical, efficient, and effective implementation while maintaining high quality standards and achieving desired outcomes."""
# Initialize the thinking agent
thinking_agent = Agent(
agent_name="Strategic-Thinker",
agent_description="Deep analysis and strategic planning agent",
system_prompt=THINKING_AGENT_PROMPT,
max_loops=1,
llm=model,
dynamic_temperature_enabled=True,
)
class DeepSeekSwarm:
def __init__(self):
self.thinking_agent = thinking_agent
def run(self, task: str):
first_one = self.thinking_agent.run(task)
return self.thinking_agent.run(first_one)
if __name__ == "__main__":
# Initialize the swarm (replace with your actual multi-agent system)
swarm = DeepSeekSwarm()
# Initialize the evaluator with the swarm instance
evaluator = SwarmEvaluator(swarm)
logger.info("Starting evaluation for dataset: gsm8k")
# For demonstration, we use 4 concurrent workers, show progress, and save results.
results = evaluator.evaluate(
"gsm8k",
split="train",
config=PRESET_DATASETS["gsm8k"],
max_workers=os.cpu_count(),
max_retries=3,
show_progress=True,
output_file="gsm8k_results.txt",
)
logger.info(f"Results for gsm8k: {results}")

@ -1,6 +1,6 @@
torch>=2.1.1,<3.0
transformers>=4.39.0,<4.51.0
asyncio>=3.4.3,<4.0
asyncio>=3.4.3,<5.0
toml
pypdf==5.1.0
ratelimit==2.2.1

@ -1,4 +1,3 @@
#!/usr/bin/env python3
"""
Test script to verify Swarms installation in Docker container.
@ -7,53 +6,59 @@ Test script to verify Swarms installation in Docker container.
import sys
from typing import Dict, Any
def test_swarms_import() -> Dict[str, Any]:
"""
Test that swarms can be imported and basic functionality works.
Returns:
Dict[str, Any]: Test results
"""
try:
import swarms
print(f" Swarms imported successfully. Version: {swarms.__version__}")
print(
f" Swarms imported successfully. Version: {swarms.__version__}"
)
# Test basic functionality
from swarms import Agent
print(" Agent class imported successfully")
return {
"status": "success",
"version": swarms.__version__,
"message": "Swarms package is working correctly"
"message": "Swarms package is working correctly",
}
except ImportError as e:
print(f" Failed to import swarms: {e}")
return {
"status": "error",
"error": str(e),
"message": "Swarms package import failed"
"message": "Swarms package import failed",
}
except Exception as e:
print(f" Unexpected error: {e}")
return {
"status": "error",
"status": "error",
"error": str(e),
"message": "Unexpected error occurred"
"message": "Unexpected error occurred",
}
def main() -> None:
"""Main function to run tests."""
print(" Testing Swarms Docker Image...")
print("=" * 50)
# Test Python version
print(f"Python version: {sys.version}")
# Test swarms import
result = test_swarms_import()
print("=" * 50)
if result["status"] == "success":
print(" All tests passed! Docker image is working correctly.")
@ -62,5 +67,6 @@ def main() -> None:
print(" Tests failed! Please check the Docker image.")
sys.exit(1)
if __name__ == "__main__":
main()
main()

@ -0,0 +1,27 @@
from swarms.sims.senator_assembly import SenatorAssembly
def main():
"""
Runs a simulation of a Senate vote on a bill proposing significant tax cuts for all Americans.
The bill is described in realistic legislative terms, and the simulation uses a concurrent voting model.
"""
senator_simulation = SenatorAssembly(
model_name="claude-sonnet-4-20250514"
)
senator_simulation.simulate_vote_concurrent(
(
"A bill proposing a significant reduction in federal income tax rates for all American citizens. "
"The legislation aims to lower tax brackets across the board, increase the standard deduction, "
"and provide additional tax relief for middle- and lower-income families. Proponents argue that "
"the bill will stimulate economic growth, increase disposable income, and enhance consumer spending. "
"Opponents raise concerns about the potential impact on the federal deficit, funding for public services, "
"and long-term fiscal responsibility. Senators must weigh the economic, social, and budgetary implications "
"before casting their votes."
),
batch_size=10,
)
if __name__ == "__main__":
main()

@ -1,5 +1,6 @@
from swarms.structs.agent import Agent
from swarms.structs.agent_builder import AgentsBuilder
from swarms.structs.agent_rearrange import AgentRearrange, rearrange
from swarms.structs.auto_swarm_builder import AutoSwarmBuilder
from swarms.structs.base_structure import BaseStructure
from swarms.structs.base_swarm import BaseSwarm
@ -9,7 +10,7 @@ from swarms.structs.board_of_directors_swarm import (
)
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.conversation import Conversation
from swarms.structs.council_judge import CouncilAsAJudge
from swarms.structs.council_as_judge import CouncilAsAJudge
from swarms.structs.cron_job import CronJob
from swarms.structs.de_hallucination_swarm import DeHallucinationSwarm
from swarms.structs.deep_research_swarm import DeepResearchSwarm
@ -66,11 +67,10 @@ from swarms.structs.multi_agent_exec import (
run_single_agent,
)
from swarms.structs.multi_agent_router import MultiAgentRouter
from swarms.structs.rearrange import AgentRearrange, rearrange
from swarms.structs.round_robin import RoundRobinSwarm
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
from swarms.structs.swarm_arange import SwarmRearrange
from swarms.structs.swarm_rearrange import SwarmRearrange
from swarms.structs.swarm_router import (
SwarmRouter,
SwarmType,

@ -21,6 +21,13 @@ from typing import (
import toml
import yaml
from litellm import model_list
from litellm.utils import (
get_max_tokens,
supports_function_calling,
supports_parallel_function_calling,
supports_vision,
)
from loguru import logger
from pydantic import BaseModel
@ -45,7 +52,6 @@ from swarms.schemas.base_schemas import (
ChatMessageResponse,
)
from swarms.schemas.conversation_schema import ConversationSchema
from swarms.schemas.llm_agent_schema import ModelConfigOrigin
from swarms.schemas.mcp_schemas import (
MCPConnection,
)
@ -422,7 +428,6 @@ class Agent:
mcp_config: Optional[MCPConnection] = None,
top_p: Optional[float] = 0.90,
conversation_schema: Optional[ConversationSchema] = None,
aditional_llm_config: Optional[ModelConfigOrigin] = None,
llm_base_url: Optional[str] = None,
llm_api_key: Optional[str] = None,
rag_config: Optional[RAGConfig] = None,
@ -430,8 +435,8 @@ class Agent:
output_raw_json_from_tool_call: bool = False,
summarize_multiple_images: bool = False,
tool_retry_attempts: int = 3,
speed_mode: str = None,
reasoning_prompt_on: bool = True,
dynamic_context_window: bool = True,
*args,
**kwargs,
):
@ -562,7 +567,6 @@ class Agent:
self.mcp_config = mcp_config
self.top_p = top_p
self.conversation_schema = conversation_schema
self.aditional_llm_config = aditional_llm_config
self.llm_base_url = llm_base_url
self.llm_api_key = llm_api_key
self.rag_config = rag_config
@ -572,8 +576,8 @@ class Agent:
)
self.summarize_multiple_images = summarize_multiple_images
self.tool_retry_attempts = tool_retry_attempts
self.speed_mode = speed_mode
self.reasoning_prompt_on = reasoning_prompt_on
self.dynamic_context_window = dynamic_context_window
# Initialize the feedback
self.feedback = []
@ -660,11 +664,13 @@ class Agent:
# Add agent name, description, and instructions to the prompt
if self.agent_name is not None:
prompt += f"\n Name: {self.agent_name}"
prompt += f"\n Your Name: {self.agent_name} \n"
elif self.agent_description is not None:
prompt += f"\n Description: {self.agent_description}"
prompt += (
f"\n Your Description: {self.agent_description} \n"
)
elif self.system_prompt is not None:
prompt += f"\n Instructions: {self.system_prompt}"
prompt += f"\n Your Instructions: {self.system_prompt} \n"
else:
prompt = self.system_prompt
@ -674,29 +680,15 @@ class Agent:
# Initialize the short term memory
memory = Conversation(
name=f"{self.agent_name}_conversation",
system_prompt=prompt,
user=self.user_name,
rules=self.rules,
token_count=(
self.conversation_schema.count_tokens
if self.conversation_schema
else False
),
message_id_on=(
self.conversation_schema.message_id_on
if self.conversation_schema
else False
),
time_enabled=(
self.conversation_schema.time_enabled
if self.conversation_schema
else False
),
)
# Add the system prompt to the conversation
memory.add(
role="System",
content=prompt,
token_count=False,
message_id_on=False,
time_enabled=True,
dynamic_context_window=self.dynamic_context_window,
tokenizer_model_name=self.model_name,
context_length=self.context_length,
)
return memory
@ -898,11 +890,7 @@ class Agent:
Returns:
bool: True if model supports vision and image is provided, False otherwise.
"""
from litellm.utils import (
supports_function_calling,
supports_parallel_function_calling,
supports_vision,
)
# Only check vision support if an image is provided
if img is not None:
@ -1304,8 +1292,6 @@ class Agent:
self._handle_run_error(error)
def __handle_run_error(self, error: any):
import traceback
if self.autosave is True:
self.save()
log_agent_data(self.to_dict())
@ -1549,11 +1535,6 @@ class Agent:
raise
def reliability_check(self):
from litellm import model_list
from litellm.utils import (
get_max_tokens,
supports_function_calling,
)
if self.system_prompt is None:
logger.warning(

@ -3,19 +3,17 @@ import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, List, Optional, Union
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation
from swarms.structs.multi_agent_exec import get_agents_info
from swarms.telemetry.main import log_agent_data
from swarms.utils.any_to_str import any_to_str
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.loguru_logger import initialize_logger
from swarms.telemetry.main import log_agent_data
from swarms.structs.conversation import Conversation
from swarms.utils.output_types import OutputType
from swarms.structs.multi_agent_exec import get_agents_info
logger = initialize_logger(log_folder="rearrange")

@ -7,10 +7,15 @@ from loguru import logger
import traceback
class BatchAgentExecutionError(Exception):
pass
def batch_agent_execution(
agents: List[Union[Agent, Callable]],
tasks: List[str] = None,
imgs: List[str] = None,
max_workers: int = max(1, int(os.cpu_count() * 0.9)),
):
"""
Execute a batch of agents on a list of tasks concurrently.
@ -38,9 +43,6 @@ def batch_agent_execution(
results = []
# Calculate max workers as 90% of available CPU cores
max_workers = max(1, int(os.cpu_count() * 0.9))
formatter.print_panel(
f"Executing {len(agents)} agents on {len(tasks)} tasks using {max_workers} workers"
)
@ -78,5 +80,7 @@ def batch_agent_execution(
return results
except Exception as e:
log = f"Batch agent execution failed Error: {str(e)} Traceback: {traceback.format_exc()}"
logger.error(log)
raise e
raise BatchAgentExecutionError(log)

@ -295,7 +295,7 @@ class ConcurrentWorkflow(BaseSwarm):
def display_agent_dashboard(
self,
title: str = "🤖 Agent Dashboard",
title: str = "ConcurrentWorkflow Dashboard",
is_final: bool = False,
) -> None:
"""
@ -307,7 +307,7 @@ class ConcurrentWorkflow(BaseSwarm):
Args:
title (str, optional): The dashboard title to display at the top.
Defaults to "🤖 Agent Dashboard".
Defaults to "🤖 ConcurrentWorkflow Dashboard".
is_final (bool, optional): Whether this is the final dashboard display
after all agents have completed. Changes formatting and styling.
Defaults to False.
@ -543,7 +543,8 @@ class ConcurrentWorkflow(BaseSwarm):
# Display final dashboard if enabled
if self.show_dashboard:
self.display_agent_dashboard(
"🎉 Final Agent Dashboard", is_final=True
"Final ConcurrentWorkflow Dashboard",
is_final=True,
)
return history_output_formatter(

@ -1,21 +1,21 @@
import traceback
import concurrent.futures
import datetime
import inspect
import json
import os
import traceback
import uuid
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Literal,
Optional,
Union,
Literal,
Any,
)
import yaml
import inspect
from swarms.utils.any_to_str import any_to_str
from swarms.utils.litellm_tokenizer import count_tokens
@ -26,6 +26,18 @@ if TYPE_CHECKING:
from loguru import logger
# Define available providers
providers = Literal[
"mem0",
"in-memory",
"supabase",
"redis",
"sqlite",
"duckdb",
"pulsar",
]
def generate_conversation_id():
"""Generate a unique conversation ID."""
return str(uuid.uuid4())
@ -50,18 +62,6 @@ def get_conversation_dir():
return conversation_dir
# Define available providers
providers = Literal[
"mem0",
"in-memory",
"supabase",
"redis",
"sqlite",
"duckdb",
"pulsar",
]
def _create_backend_conversation(backend: str, **kwargs):
"""
Create a backend conversation instance based on the specified backend type.
@ -183,9 +183,9 @@ class Conversation:
name: str = "conversation-test",
system_prompt: Optional[str] = None,
time_enabled: bool = False,
autosave: bool = False, # Changed default to False
autosave: bool = False,
save_filepath: str = None,
load_filepath: str = None, # New parameter to specify which file to load from
load_filepath: str = None,
context_length: int = 8192,
rules: str = None,
custom_rules_prompt: str = None,
@ -211,6 +211,8 @@ class Conversation:
redis_data_dir: Optional[str] = None,
conversations_dir: Optional[str] = None,
export_method: str = "json",
dynamic_context_window: bool = True,
caching: bool = True,
*args,
**kwargs,
):
@ -249,6 +251,8 @@ class Conversation:
self.auto_persist = auto_persist
self.redis_data_dir = redis_data_dir
self.export_method = export_method
self.dynamic_context_window = dynamic_context_window
self.caching = caching
if self.name is None:
self.name = id
@ -933,7 +937,15 @@ class Conversation:
# Fallback to in-memory implementation
pass
elif self.dynamic_context_window is True:
return self.dynamic_auto_chunking()
else:
return self._return_history_as_string_worker()
def _return_history_as_string_worker(self):
formatted_messages = []
for message in self.conversation_history:
formatted_messages.append(
f"{message['role']}: {message['content']}"
@ -1778,20 +1790,38 @@ class Conversation:
pass
self.conversation_history = []
def dynamic_auto_chunking(self):
all_tokens = self._return_history_as_string_worker()
total_tokens = count_tokens(
all_tokens, self.tokenizer_model_name
)
if total_tokens > self.context_length:
# Get the difference between the count_tokens and the context_length
difference = total_tokens - self.context_length
# Slice the first difference number of messages and contents from the beginning of the conversation history
new_history = all_tokens[difference:]
return new_history
# # Example usage
# # conversation = Conversation()
# conversation = Conversation(token_count=True)
# Example usage
# conversation = Conversation()
# conversation = Conversation(token_count=True, context_length=14)
# conversation.add("user", "Hello, how are you?")
# conversation.add("assistant", "I am doing well, thanks.")
# conversation.add("user", "What is the weather in Tokyo?")
# print(conversation.dynamic_auto_chunking())
# # conversation.add(
# # "assistant", {"name": "tool_1", "output": "Hello, how are you?"}
# # )
# # print(conversation.return_json())
# )
# print(conversation.return_json())
# # # print(conversation.get_last_message_as_string())
# # print(conversation.get_last_message_as_string())
# print(conversation.return_json())
# # # conversation.add("assistant", "I am doing well, thanks.")
# # # # print(conversation.to_json())
# # print(type(conversation.to_dict()))
# # print(conversation.to_yaml())
# # conversation.add("assistant", "I am doing well, thanks.")
# # # print(conversation.to_json())
# print(type(conversation.to_dict()))
# print(conversation.to_yaml())

@ -1,306 +0,0 @@
import json
from typing import Any, List
from loguru import logger
from pydantic import BaseModel, Field
from swarms import Agent
class AgentOutput(BaseModel):
"""
Schema for capturing metadata and results of an agent run.
"""
agent_name: str = Field(..., description="Name of the agent.")
input_query: str = Field(
..., description="Input query provided to the agent."
)
output_result: Any = Field(
..., description="Result produced by the agent."
)
metadata: dict = Field(
..., description="Additional metadata about the agent run."
)
class MatrixSwarm:
"""
A class to manage a matrix of agents and perform matrix operations similar to linear algebra.
"""
def __init__(self, agents: List[List[Agent]]):
"""
Initializes the MatrixSwarm with a 2D list of agents.
Args:
agents (List[List[Agent]]): 2D list of agents representing the matrix.
"""
if not agents or not all(
isinstance(row, list) for row in agents
):
raise ValueError("Agents must be provided as a 2D list.")
if not all(
isinstance(agent, Agent)
for row in agents
for agent in row
):
raise ValueError(
"All elements of the matrix must be instances of `Agent`."
)
self.agents = agents
self.outputs = [] # List to store outputs as AgentOutput
def validate_dimensions(self, other: "MatrixSwarm") -> None:
"""
Validates that two matrices have compatible dimensions for operations.
Args:
other (MatrixSwarm): Another MatrixSwarm.
Raises:
ValueError: If dimensions are incompatible.
"""
if len(self.agents) != len(other.agents) or len(
self.agents[0]
) != len(other.agents[0]):
raise ValueError(
"Matrix dimensions are incompatible for this operation."
)
def transpose(self) -> "MatrixSwarm":
"""
Transposes the matrix of agents (swap rows and columns).
Returns:
MatrixSwarm: A new transposed MatrixSwarm.
"""
transposed_agents = [
[self.agents[j][i] for j in range(len(self.agents))]
for i in range(len(self.agents[0]))
]
return MatrixSwarm(transposed_agents)
def add(self, other: "MatrixSwarm") -> "MatrixSwarm":
"""
Adds two matrices element-wise.
Args:
other (MatrixSwarm): Another MatrixSwarm to add.
Returns:
MatrixSwarm: A new MatrixSwarm resulting from the addition.
"""
self.validate_dimensions(other)
added_agents = [
[self.agents[i][j] for j in range(len(self.agents[i]))]
for i in range(len(self.agents))
]
return MatrixSwarm(added_agents)
def scalar_multiply(self, scalar: int) -> "MatrixSwarm":
"""
Scales the agents by duplicating them scalar times along the row.
Args:
scalar (int): The scalar multiplier.
Returns:
MatrixSwarm: A new MatrixSwarm where each agent is repeated scalar times along the row.
"""
scaled_agents = [
[agent for _ in range(scalar) for agent in row]
for row in self.agents
]
return MatrixSwarm(scaled_agents)
def multiply(
self, other: "MatrixSwarm", inputs: List[str]
) -> List[List[AgentOutput]]:
"""
Multiplies two matrices (dot product between rows and columns).
Args:
other (MatrixSwarm): Another MatrixSwarm for multiplication.
inputs (List[str]): A list of input queries for the agents.
Returns:
List[List[AgentOutput]]: A resulting matrix of outputs after multiplication.
"""
if len(self.agents[0]) != len(other.agents):
raise ValueError(
"Matrix dimensions are incompatible for multiplication."
)
results = []
for i, row in enumerate(self.agents):
row_results = []
for col_idx in range(len(other.agents[0])):
col = [
other.agents[row_idx][col_idx]
for row_idx in range(len(other.agents))
]
query = inputs[
i
] # Input query for the corresponding row
intermediate_result = []
for agent_r, agent_c in zip(row, col):
try:
result = agent_r.run(query)
intermediate_result.append(result)
except Exception as e:
intermediate_result.append(f"Error: {e}")
# Aggregate outputs from dot product
combined_result = " ".join(
intermediate_result
) # Example aggregation
row_results.append(
AgentOutput(
agent_name=f"DotProduct-{i}-{col_idx}",
input_query=query,
output_result=combined_result,
metadata={"row": i, "col": col_idx},
)
)
results.append(row_results)
return results
def subtract(self, other: "MatrixSwarm") -> "MatrixSwarm":
"""
Subtracts two matrices element-wise.
Args:
other (MatrixSwarm): Another MatrixSwarm to subtract.
Returns:
MatrixSwarm: A new MatrixSwarm resulting from the subtraction.
"""
self.validate_dimensions(other)
subtracted_agents = [
[self.agents[i][j] for j in range(len(self.agents[i]))]
for i in range(len(self.agents))
]
return MatrixSwarm(subtracted_agents)
def identity(self, size: int) -> "MatrixSwarm":
"""
Creates an identity matrix of agents with size `size`.
Args:
size (int): Size of the identity matrix (NxN).
Returns:
MatrixSwarm: An identity MatrixSwarm.
"""
identity_agents = [
[
(
self.agents[i][j]
if i == j
else Agent(
agent_name=f"Zero-Agent-{i}-{j}",
system_prompt="",
)
)
for j in range(size)
]
for i in range(size)
]
return MatrixSwarm(identity_agents)
def determinant(self) -> Any:
"""
Computes the determinant of a square MatrixSwarm.
Returns:
Any: Determinant of the matrix (as agent outputs).
"""
if len(self.agents) != len(self.agents[0]):
raise ValueError(
"Determinant can only be computed for square matrices."
)
# Recursive determinant calculation (example using placeholder logic)
if len(self.agents) == 1:
return self.agents[0][0].run("Compute determinant")
det_result = 0
for i in range(len(self.agents)):
submatrix = MatrixSwarm(
[row[:i] + row[i + 1 :] for row in self.agents[1:]]
)
cofactor = ((-1) ** i) * self.agents[0][i].run(
"Compute determinant"
)
det_result += cofactor * submatrix.determinant()
return det_result
def save_to_file(self, path: str) -> None:
"""
Saves the agent matrix structure and metadata to a file.
Args:
path (str): File path to save the matrix.
"""
try:
matrix_data = {
"agents": [
[agent.agent_name for agent in row]
for row in self.agents
],
"outputs": [output.dict() for output in self.outputs],
}
with open(path, "w") as f:
json.dump(matrix_data, f, indent=4)
logger.info(f"MatrixSwarm saved to {path}")
except Exception as e:
logger.error(f"Error saving MatrixSwarm: {e}")
# # Example usage
# if __name__ == "__main__":
# from swarms.prompts.finance_agent_sys_prompt import (
# FINANCIAL_AGENT_SYS_PROMPT,
# )
# # Create a 3x3 matrix of agents
# agents = [
# [
# Agent(
# agent_name=f"Agent-{i}-{j}",
# system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
# model_name="gpt-4o-mini",
# max_loops=1,
# autosave=True,
# dashboard=False,
# verbose=True,
# dynamic_temperature_enabled=True,
# saved_state_path=f"agent_{i}_{j}.json",
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# return_step_meta=False,
# output_type="string",
# streaming_on=False,
# )
# for j in range(3)
# ]
# for i in range(3)
# ]
# # Initialize the matrix
# agent_matrix = MatrixSwarm(agents)
# # Example queries
# inputs = [
# "Explain Roth IRA benefits",
# "Differences between ETFs and mutual funds",
# "How to create a diversified portfolio",
# ]
# # Run agents
# outputs = agent_matrix.multiply(agent_matrix.transpose(), inputs)
# # Save results
# agent_matrix.save_to_file("agent_matrix_results.json")

@ -2,7 +2,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable, List, Optional, Union
from swarms.structs.agent import Agent
from swarms.structs.rearrange import AgentRearrange
from swarms.structs.agent_rearrange import AgentRearrange
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType

@ -1,326 +0,0 @@
import math
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Callable, Dict, Optional, Tuple
from datasets import Dataset, load_dataset
from loguru import logger
from tqdm import tqdm
# -----------------------------------------------------------------------------
# Logging configuration: log to console and file (rotating by size)
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Swarm interface example
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Benchmark configuration
# -----------------------------------------------------------------------------
class BenchmarkConfig:
"""
Configuration for a benchmark dataset.
Attributes:
input_column (str): The column containing the task prompt.
answer_column (str): The column containing the expected answer.
answer_extractor (Optional[Callable[[Any], str]]): Function to extract
a string answer from the dataset's raw answer format.
answer_matcher (Optional[Callable[[str, str], bool]]): Function to compare
the expected answer and the swarm output. If None, a simple substring
containment is used.
"""
def __init__(
self,
input_column: str,
answer_column: str,
answer_extractor: Optional[Callable[[Any], str]] = None,
answer_matcher: Optional[Callable[[str, str], bool]] = None,
):
self.input_column = input_column
self.answer_column = answer_column
self.answer_extractor = answer_extractor
self.answer_matcher = answer_matcher
# -----------------------------------------------------------------------------
# Preset dataset configurations for popular benchmarks
# -----------------------------------------------------------------------------
PRESET_DATASETS: Dict[str, BenchmarkConfig] = {
"gsm8k": BenchmarkConfig(
input_column="question",
answer_column="answer",
),
"squad": BenchmarkConfig(
input_column="question",
answer_column="answers",
answer_extractor=lambda ans: (
ans["text"][0]
if isinstance(ans, dict)
and "text" in ans
and isinstance(ans["text"], list)
and ans["text"]
else str(ans)
),
),
"winogrande": BenchmarkConfig(
input_column="sentence",
answer_column="answer",
),
"commonsense_qa": BenchmarkConfig(
input_column="question",
answer_column="answerKey",
),
# Add additional presets here.
}
# -----------------------------------------------------------------------------
# SwarmEvaluator with extended features
# -----------------------------------------------------------------------------
class SwarmEvaluator:
"""
Evaluator that uses a swarm of agents to process benchmark datasets
from Hugging Face, with concurrency, retries, progress display, performance timing,
and customizable answer matching.
Example:
swarm = Swarm()
evaluator = SwarmEvaluator(swarm)
results = evaluator.evaluate("gsm8k", split="test", max_workers=4)
print(results)
"""
def __init__(self, swarm: callable) -> None:
"""
Initialize the evaluator with a given swarm.
Args:
swarm (Swarm): A swarm instance with a callable run(task: str) method.
"""
self.swarm = swarm
def evaluate(
self,
dataset_name: str,
split: str = "test",
config: Optional[BenchmarkConfig] = None,
max_workers: int = 1,
max_retries: int = 3,
show_progress: bool = True,
output_file: Optional[str] = None,
) -> Dict[str, Any]:
"""
Evaluate the specified benchmark dataset using the swarm.
Args:
dataset_name (str): The dataset name (from Hugging Face).
split (str): The dataset split (e.g., "test", "validation").
config (Optional[BenchmarkConfig]): Benchmark configuration. If None,
a preset config is used.
max_workers (int): Number of concurrent workers.
max_retries (int): Number of retries for swarm tasks on failure.
show_progress (bool): If True, display a progress bar.
output_file (Optional[str]): Path to a file to write the results.
Returns:
Dict[str, Any]: Evaluation metrics including total examples, correct answers,
accuracy, and total evaluation time.
"""
if config is None:
config = PRESET_DATASETS.get(dataset_name)
if config is None:
raise ValueError(
f"No preset config for dataset '{dataset_name}'. Provide a BenchmarkConfig."
)
logger.info(
f"Loading dataset '{dataset_name}' (split: {split})..."
)
dataset: Dataset = load_dataset(dataset_name, split=split)
total_examples = len(dataset)
logger.info(f"Total examples to evaluate: {total_examples}")
start_time = time.time()
correct = 0
# Function to process a single example.
def _process_example(
example: Dict[str, Any], idx: int
) -> Tuple[bool, float]:
task_start = time.time()
task_text = example.get(config.input_column)
expected_answer = example.get(config.answer_column)
if task_text is None or expected_answer is None:
logger.warning(
f"Example {idx}: Missing '{config.input_column}' or '{config.answer_column}', skipping."
)
return (False, 0.0)
# Use answer_extractor if provided.
if config.answer_extractor:
try:
expected_answer = config.answer_extractor(
expected_answer
)
except Exception as e:
logger.error(
f"Example {idx}: Error extracting answer: {e}"
)
return (False, 0.0)
logger.debug(f"Example {idx} - Task: {task_text}")
logger.debug(
f"Example {idx} - Expected Answer: {expected_answer}"
)
try:
swarm_output = self._run_with_retry(
task_text, max_retries
)
except Exception as e:
logger.error(
f"Example {idx}: Failed after retries. Error: {e}"
)
return (False, time.time() - task_start)
logger.debug(
f"Example {idx} - Swarm Output: {swarm_output}"
)
# Use custom matcher if provided; otherwise, default matching.
if config.answer_matcher:
is_correct = config.answer_matcher(
expected_answer, swarm_output
)
else:
is_correct = self._default_matcher(
expected_answer, swarm_output
)
task_time = time.time() - task_start
logger.info(
f"Example {idx}: {'Correct' if is_correct else 'Incorrect'} in {task_time:.2f}s"
)
return (is_correct, task_time)
# Use ThreadPoolExecutor for concurrency.
futures = []
total_time = 0.0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Optionally wrap the dataset with tqdm for a progress bar.
examples_iter = enumerate(dataset, start=1)
if show_progress:
examples_iter = tqdm(
list(examples_iter),
total=total_examples,
desc="Evaluating",
)
for idx, example in examples_iter:
futures.append(
executor.submit(_process_example, example, idx)
)
for future in as_completed(futures):
try:
is_correct, elapsed = future.result()
total_time += elapsed
if is_correct:
correct += 1
except Exception as e:
logger.error(f"Error processing an example: {e}")
overall_time = time.time() - start_time
accuracy = (
correct / total_examples if total_examples > 0 else 0.0
)
logger.info(
f"Evaluation complete. Total examples: {total_examples}, Correct: {correct}, "
f"Accuracy: {accuracy:.2%}, Overall Time: {overall_time:.2f}s, "
f"Average per-example time: {total_time/total_examples if total_examples else 0:.2f}s"
)
results = {
"total": total_examples,
"correct": correct,
"accuracy": accuracy,
"overall_time": overall_time,
"average_example_time": (
total_time / total_examples
if total_examples
else math.nan
),
}
# Optionally save results to a file.
if output_file:
try:
with open(output_file, "w") as f:
for key, value in results.items():
f.write(f"{key}: {value}\n")
logger.info(f"Results saved to {output_file}")
except Exception as e:
logger.error(
f"Error saving results to {output_file}: {e}"
)
return results
def _run_with_retry(self, task: str, max_retries: int) -> str:
"""
Runs the swarm task with a retry mechanism.
Args:
task (str): The task string.
max_retries (int): Maximum number of retries.
Returns:
str: Swarm output.
Raises:
Exception: If all retries fail.
"""
attempt = 0
while attempt <= max_retries:
try:
start = time.time()
result = self.swarm.run(task)
elapsed = time.time() - start
logger.debug(
f"Task succeeded in {elapsed:.2f}s on attempt {attempt + 1}"
)
return result
except Exception as e:
logger.warning(
f"Task failed on attempt {attempt + 1}: {e}"
)
attempt += 1
time.sleep(0.5 * attempt) # Exponential backoff
raise Exception("Max retries exceeded for task.")
@staticmethod
def _default_matcher(expected: str, output: str) -> bool:
"""
Default answer matching using a normalized substring check.
Args:
expected (str): The expected answer.
output (str): The swarm output.
Returns:
bool: True if expected is found in output; otherwise, False.
"""
expected_norm = " ".join(expected.strip().split())
output_norm = " ".join(output.strip().split())
return expected_norm in output_norm
# -----------------------------------------------------------------------------
# Example usage
# -----------------------------------------------------------------------------

@ -2,4 +2,4 @@ import uuid
def generate_swarm_id():
return str(uuid.uuid4())
return f"swarm-{uuid.uuid4().hex}"

@ -11,33 +11,31 @@ from swarms.prompts.multi_agent_collab_prompt import (
)
from swarms.structs.agent import Agent
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.council_as_judge import CouncilAsAJudge
from swarms.structs.csv_to_agent import AgentLoader
from swarms.structs.deep_research_swarm import DeepResearchSwarm
from swarms.structs.groupchat import GroupChat
from swarms.structs.heavy_swarm import HeavySwarm
from swarms.structs.hiearchical_swarm import HierarchicalSwarm
from swarms.structs.interactive_groupchat import InteractiveGroupChat
from swarms.structs.ma_utils import list_all_agents
from swarms.structs.majority_voting import MajorityVoting
from swarms.structs.malt import MALT
from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.structs.multi_agent_router import MultiAgentRouter
from swarms.structs.rearrange import AgentRearrange
from swarms.structs.agent_rearrange import AgentRearrange
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
from swarms.structs.swarm_matcher import swarm_matcher
from swarms.telemetry.log_executions import log_execution
from swarms.utils.output_types import OutputType
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.malt import MALT
from swarms.structs.deep_research_swarm import DeepResearchSwarm
from swarms.structs.council_judge import CouncilAsAJudge
from swarms.structs.interactive_groupchat import InteractiveGroupChat
from swarms.structs.heavy_swarm import HeavySwarm
from swarms.structs.ma_utils import list_all_agents
from swarms.utils.generate_keys import generate_api_key
from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.output_types import OutputType
logger = initialize_logger(log_folder="swarm_router")
SwarmType = Literal[
"AgentRearrange",
"MixtureOfAgents",
"SpreadSheetSwarm",
"SequentialWorkflow",
"ConcurrentWorkflow",
"GroupChat",
@ -146,7 +144,6 @@ class SwarmRouter:
Available Swarm Types:
- AgentRearrange: Optimizes agent arrangement for task execution
- MixtureOfAgents: Combines multiple agent types for diverse tasks
- SpreadSheetSwarm: Uses spreadsheet-like operations for task management
- SequentialWorkflow: Executes tasks sequentially
- ConcurrentWorkflow: Executes tasks in parallel
- "auto": Automatically selects best swarm type via embedding search
@ -179,7 +176,7 @@ class SwarmRouter:
description: str = "Routes your task to the desired swarm",
max_loops: int = 1,
agents: List[Union[Agent, Callable]] = [],
swarm_type: SwarmType = "SequentialWorkflow", # "SpreadSheetSwarm" # "auto"
swarm_type: SwarmType = "SequentialWorkflow", # "ConcurrentWorkflow" # "auto"
autosave: bool = False,
rearrange_flow: str = None,
return_json: bool = False,
@ -396,7 +393,6 @@ class SwarmRouter:
"MajorityVoting": self._create_majority_voting,
"GroupChat": self._create_group_chat,
"MultiAgentRouter": self._create_multi_agent_router,
"SpreadSheetSwarm": self._create_spreadsheet_swarm,
"SequentialWorkflow": self._create_sequential_workflow,
"ConcurrentWorkflow": self._create_concurrent_workflow,
}
@ -528,18 +524,6 @@ class SwarmRouter:
output_type=self.output_type,
)
def _create_spreadsheet_swarm(self, *args, **kwargs):
"""Factory function for SpreadSheetSwarm."""
return SpreadSheetSwarm(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
autosave_on=self.autosave,
*args,
**kwargs,
)
def _create_sequential_workflow(self, *args, **kwargs):
"""Factory function for SequentialWorkflow."""
return SequentialWorkflow(
@ -580,7 +564,7 @@ class SwarmRouter:
**kwargs: Arbitrary keyword arguments.
Returns:
Union[AgentRearrange, MixtureOfAgents, SpreadSheetSwarm, SequentialWorkflow, ConcurrentWorkflow]:
Union[AgentRearrange, MixtureOfAgents, SequentialWorkflow, ConcurrentWorkflow]:
The instantiated swarm object.
Raises:

@ -397,7 +397,7 @@ class Formatter:
def print_agent_dashboard(
self,
agents_data: List[Dict[str, Any]],
title: str = "🤖 Agent Dashboard",
title: str = "ConcurrentWorkflow Dashboard",
is_final: bool = False,
) -> None:
"""

@ -7,7 +7,7 @@ from loguru import logger
from swarm_models import OpenAIChat
from swarms.structs.agent import Agent
from swarms.structs.rearrange import AgentRearrange
from swarms.structs.agent_rearrange import AgentRearrange
class TestResult:

@ -1,216 +0,0 @@
from swarms.structs.matrix_swarm import AgentMatrix, AgentOutput
from swarms import Agent
def create_test_matrix(rows: int, cols: int) -> AgentMatrix:
"""Helper function to create a test agent matrix"""
agents = [
[
Agent(
agent_name=f"TestAgent-{i}-{j}",
system_prompt="Test prompt",
)
for j in range(cols)
]
for i in range(rows)
]
return AgentMatrix(agents)
def test_init():
"""Test AgentMatrix initialization"""
# Test valid initialization
matrix = create_test_matrix(2, 2)
assert isinstance(matrix, AgentMatrix)
assert len(matrix.agents) == 2
assert len(matrix.agents[0]) == 2
# Test invalid initialization
try:
AgentMatrix([[1, 2], [3, 4]]) # Non-agent elements
assert False, "Should raise ValueError"
except ValueError:
pass
try:
AgentMatrix([]) # Empty matrix
assert False, "Should raise ValueError"
except ValueError:
pass
def test_transpose():
"""Test matrix transpose operation"""
matrix = create_test_matrix(2, 3)
transposed = matrix.transpose()
assert len(transposed.agents) == 3 # Original cols become rows
assert len(transposed.agents[0]) == 2 # Original rows become cols
# Verify agent positions
for i in range(2):
for j in range(3):
assert (
matrix.agents[i][j].agent_name
== transposed.agents[j][i].agent_name
)
def test_add():
"""Test matrix addition"""
matrix1 = create_test_matrix(2, 2)
matrix2 = create_test_matrix(2, 2)
result = matrix1.add(matrix2)
assert len(result.agents) == 2
assert len(result.agents[0]) == 2
# Test incompatible dimensions
matrix3 = create_test_matrix(2, 3)
try:
matrix1.add(matrix3)
assert False, "Should raise ValueError"
except ValueError:
pass
def test_scalar_multiply():
"""Test scalar multiplication"""
matrix = create_test_matrix(2, 2)
scalar = 3
result = matrix.scalar_multiply(scalar)
assert len(result.agents) == 2
assert len(result.agents[0]) == 2 * scalar
# Verify agent duplication
for i in range(len(result.agents)):
for j in range(0, len(result.agents[0]), scalar):
original_agent = matrix.agents[i][j // scalar]
for k in range(scalar):
assert (
result.agents[i][j + k].agent_name
== original_agent.agent_name
)
def test_multiply():
"""Test matrix multiplication"""
matrix1 = create_test_matrix(2, 3)
matrix2 = create_test_matrix(3, 2)
inputs = ["test query 1", "test query 2"]
result = matrix1.multiply(matrix2, inputs)
assert len(result) == 2 # Number of rows in first matrix
assert len(result[0]) == 2 # Number of columns in second matrix
# Verify output structure
for row in result:
for output in row:
assert isinstance(output, AgentOutput)
assert isinstance(output.input_query, str)
assert isinstance(output.metadata, dict)
def test_subtract():
"""Test matrix subtraction"""
matrix1 = create_test_matrix(2, 2)
matrix2 = create_test_matrix(2, 2)
result = matrix1.subtract(matrix2)
assert len(result.agents) == 2
assert len(result.agents[0]) == 2
def test_identity():
"""Test identity matrix creation"""
matrix = create_test_matrix(3, 3)
identity = matrix.identity(3)
assert len(identity.agents) == 3
assert len(identity.agents[0]) == 3
# Verify diagonal elements are from original matrix
for i in range(3):
assert (
identity.agents[i][i].agent_name
== matrix.agents[i][i].agent_name
)
# Verify non-diagonal elements are zero agents
for j in range(3):
if i != j:
assert identity.agents[i][j].agent_name.startswith(
"Zero-Agent"
)
def test_determinant():
"""Test determinant calculation"""
# Test 1x1 matrix
matrix1 = create_test_matrix(1, 1)
det1 = matrix1.determinant()
assert det1 is not None
# Test 2x2 matrix
matrix2 = create_test_matrix(2, 2)
det2 = matrix2.determinant()
assert det2 is not None
# Test non-square matrix
matrix3 = create_test_matrix(2, 3)
try:
matrix3.determinant()
assert False, "Should raise ValueError"
except ValueError:
pass
def test_save_to_file(tmp_path):
"""Test saving matrix to file"""
import os
matrix = create_test_matrix(2, 2)
file_path = os.path.join(tmp_path, "test_matrix.json")
matrix.save_to_file(file_path)
assert os.path.exists(file_path)
# Verify file contents
import json
with open(file_path, "r") as f:
data = json.load(f)
assert "agents" in data
assert "outputs" in data
assert len(data["agents"]) == 2
assert len(data["agents"][0]) == 2
def run_all_tests():
"""Run all test functions"""
test_functions = [
test_init,
test_transpose,
test_add,
test_scalar_multiply,
test_multiply,
test_subtract,
test_identity,
test_determinant,
]
for test_func in test_functions:
try:
test_func()
print(f"{test_func.__name__} passed")
except AssertionError as e:
print(f"{test_func.__name__} failed: {str(e)}")
except Exception as e:
print(
f"{test_func.__name__} failed with exception: {str(e)}"
)
if __name__ == "__main__":
run_all_tests()
Loading…
Cancel
Save