diff --git a/.gitignore b/.gitignore index 18b6849c..89b0cdc7 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ dataframe/ static/generated runs Financial-Analysis-Agent_state.json +experimental artifacts_five encryption errors diff --git a/agent_showcase_example.py b/agent_showcase_example.py new file mode 100644 index 00000000..b78abf81 --- /dev/null +++ b/agent_showcase_example.py @@ -0,0 +1,68 @@ +import os + +from swarms import Agent + +from swarm_models import OpenAIChat +from swarms.structs.agents_available import showcase_available_agents + +# Get the OpenAI API key from the environment variable +api_key = os.getenv("OPENAI_API_KEY") + +# Create an instance of the OpenAIChat class +model = OpenAIChat( + api_key=api_key, model_name="gpt-4o-mini", temperature=0.1 +) + +# Initialize the Claims Director agent +director_agent = Agent( + agent_name="ClaimsDirector", + agent_description="Oversees and coordinates the medical insurance claims processing workflow", + system_prompt="""You are the Claims Director responsible for managing the medical insurance claims process. + Assign and prioritize tasks between claims processors and auditors. Ensure claims are handled efficiently + and accurately while maintaining compliance with insurance policies and regulations.""", + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="director_agent.json", +) + +# Initialize Claims Processor agent +processor_agent = Agent( + agent_name="ClaimsProcessor", + agent_description="Reviews and processes medical insurance claims, verifying coverage and eligibility", + system_prompt="""Review medical insurance claims for completeness and accuracy. Verify patient eligibility, + coverage details, and process claims according to policy guidelines. Flag any claims requiring special review.""", + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="processor_agent.json", +) + +# Initialize Claims Auditor agent +auditor_agent = Agent( + agent_name="ClaimsAuditor", + agent_description="Audits processed claims for accuracy and compliance with policies and regulations", + system_prompt="""Audit processed insurance claims for accuracy and compliance. Review claim decisions, + identify potential fraud or errors, and ensure all processing follows established guidelines and regulations.""", + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="auditor_agent.json", +) + +# Create a list of agents +agents = [director_agent, processor_agent, auditor_agent] + +print(showcase_available_agents(agents=agents)) diff --git a/auto_flow.py b/auto_flow.py deleted file mode 100644 index 4a2f84c2..00000000 --- a/auto_flow.py +++ /dev/null @@ -1,125 +0,0 @@ -import os -import json -from pydantic import BaseModel, Field -from swarm_models import OpenAIFunctionCaller -from dotenv import load_dotenv -from typing import Any, List - -load_dotenv() - - -class Flow(BaseModel): - id: str = Field( - description="A unique identifier for the flow. This should be a short, descriptive name that captures the main purpose of the flow. Use - to separate words and make it lowercase." - ) - plan: str = Field( - description="The comprehensive plan detailing how the flow will accomplish the given task. This should include the high-level strategy, key milestones, and expected outcomes. The plan should clearly articulate what the overall goal is, what success looks like, and how progress will be measured throughout execution." - ) - failures_prediction: str = Field( - description="A thorough analysis of potential failure modes and mitigation strategies. This should identify technical risks, edge cases, error conditions, and possible points of failure in the flow. For each identified risk, include specific preventive measures, fallback approaches, and recovery procedures to ensure robustness and reliability." - ) - rationale: str = Field( - description="The detailed reasoning and justification for why this specific flow design is optimal for the given task. This should explain the key architectural decisions, tradeoffs considered, alternatives evaluated, and why this approach best satisfies the requirements. Include both technical and business factors that influenced the design." - ) - flow: str = Field( - description="The precise execution flow defining how agents interact and coordinate. Use -> to indicate sequential processing where one agent must complete before the next begins (e.g. agent1 -> agent2 -> agent3). Use , to indicate parallel execution where multiple agents can run simultaneously (e.g. agent1 -> agent2, agent3, agent4). The flow should clearly show the dependencies and parallelization opportunities between agents. You must only use the agent names provided in the task description do not make up new agent names and do not use any other formatting." - ) - - -class AgentRearrangeBuilder(BaseModel): - name: str = Field( - description="The name of the swarm. This should be a short, descriptive name that captures the main purpose of the flow." - ) - description: str = Field( - description="A brief description of the swarm. This should be a concise summary of the main purpose of the swarm." - ) - flows: List[Flow] = Field( - description="A list of flows that are optimal for the given task. Each flow should be a detailed plan, failure prediction, rationale, and execution flow." - ) - swarm_flow: str = Field( - description="The flow defining how each team should communicate and coordinate with eachother.Use -> to indicate sequential processing where one id must complete before the next begins (e.g. team1 -> team2 -> team3). Use , to indicate parallel execution where multiple teams can run simultaneously (e.g. team1 -> team2, team3, team4). The flow should clearly show the dependencies and parallelization opportunities between teams. You must only use the team names provided in the id do not make up new team names and do not use any other formatting." - ) - - -# def flow_generator(task: str) -> Flow: - - -def setup_model(base_model: BaseModel = Flow): - model = OpenAIFunctionCaller( - system_prompt="""You are an expert flow architect specializing in designing multi-agent workflows. Your role is to analyze tasks and create optimal execution flows that coordinate multiple AI agents effectively. - - When given a task, you will: - 1. Develop a comprehensive plan breaking down the task into logical steps - 2. Carefully consider potential failure modes and build in robust error handling - 3. Provide clear rationale for your architectural decisions and agent coordination strategy - 4. Design a precise flow showing both sequential dependencies and parallel execution opportunities - - Your flows should maximize: - - Efficiency through smart parallelization - - Reliability through thorough error handling - - Clarity through well-structured agent interactions - - Effectiveness through strategic task decomposition - - Format your flow using -> for sequential steps and , for parallel execution. Be specific about agent roles and interactions. - """, - base_model=base_model, - openai_api_key=os.getenv("OPENAI_API_KEY"), - temperature=0.5, - ) - return model - - -def generate_flow(task: str) -> Any: - model = setup_model() - flow = model.run(task) - print(json.dumps(flow, indent=4)) - return flow - - -def generate_agent_rearrange(task: str) -> Any: - model = setup_model(base_model=AgentRearrangeBuilder) - flow = model.run(task) - print(json.dumps(flow, indent=4)) - return flow - - -if __name__ == "__main__": - # Basic patient diagnosis flow - # generate_flow("Diagnose a patient's symptoms and create a treatment plan. You have 3 agents to use: Diagnostician, Specialist, CareCoordinator") - - # # Complex multi-condition case - # generate_flow("""Handle a complex patient case with multiple chronic conditions requiring ongoing care coordination. - # The patient has diabetes, heart disease, and chronic pain. - # Create a comprehensive diagnosis and treatment plan. - # You have 3 agents to use: Diagnostician, Specialist, CareCoordinator""") - - # # Emergency trauma case - # generate_flow("""Process an emergency trauma case requiring rapid diagnosis and immediate intervention. - # Patient presents with multiple injuries from a car accident. - # Develop immediate and long-term treatment plans. - # You have 3 agents to use: Diagnostician, Specialist, CareCoordinator""") - - # # Long-term care planning - # generate_flow("""Design a 6-month care plan for an elderly patient with declining cognitive function. - # Include regular assessments, specialist consultations, and family coordination. - # You have 3 agents to use: Diagnostician, Specialist, CareCoordinator""") - - # # Mental health assessment - # generate_flow("""Conduct a comprehensive mental health assessment and develop treatment strategy. - # Patient shows signs of depression and anxiety with possible underlying conditions. - # Create both immediate intervention and long-term support plans. - # You have 3 agents to use: Diagnostician, Specialist, CareCoordinator""") - - generate_agent_rearrange( - """Build a complete automated hedge fund system. - Design and implement a sophisticated trading strategy incorporating multiple asset classes, - risk management protocols, and automated execution systems. - The system should include: - - Market analysis and research capabilities - - Portfolio optimization and risk management - - Automated trade execution and settlement - - Compliance and regulatory monitoring - - Performance tracking and reporting - - Fund operations and administration - Create a comprehensive architecture that integrates all these components into a fully automated system.""" - ) diff --git a/auto_swarm_router.py b/auto_swarm_router.py index 8a692454..41a3badd 100644 --- a/auto_swarm_router.py +++ b/auto_swarm_router.py @@ -16,51 +16,12 @@ model = OpenAIChat( model_name="llama-3.1-70b-versatile", temperature=0.1, ) -# Define specialized system prompts for each agent -DATA_EXTRACTOR_PROMPT = """You are a highly specialized private equity agent focused on data extraction from various documents. Your expertise includes: -1. Extracting key financial metrics (revenue, EBITDA, growth rates, etc.) from financial statements and reports -2. Identifying and extracting important contract terms from legal documents -3. Pulling out relevant market data from industry reports and analyses -4. Extracting operational KPIs from management presentations and internal reports -5. Identifying and extracting key personnel information from organizational charts and bios -Provide accurate, structured data extracted from various document types to support investment analysis.""" -SUMMARIZER_PROMPT = """You are an expert private equity agent specializing in summarizing complex documents. Your core competencies include: -1. Distilling lengthy financial reports into concise executive summaries -2. Summarizing legal documents, highlighting key terms and potential risks -3. Condensing industry reports to capture essential market trends and competitive dynamics -4. Summarizing management presentations to highlight key strategic initiatives and projections -5. Creating brief overviews of technical documents, emphasizing critical points for non-technical stakeholders -Deliver clear, concise summaries that capture the essence of various documents while highlighting information crucial for investment decisions.""" - -FINANCIAL_ANALYST_PROMPT = """You are a specialized private equity agent focused on financial analysis. Your key responsibilities include: -1. Analyzing historical financial statements to identify trends and potential issues -2. Evaluating the quality of earnings and potential adjustments to EBITDA -3. Assessing working capital requirements and cash flow dynamics -4. Analyzing capital structure and debt capacity -5. Evaluating financial projections and underlying assumptions -Provide thorough, insightful financial analysis to inform investment decisions and valuation.""" - -MARKET_ANALYST_PROMPT = """You are a highly skilled private equity agent specializing in market analysis. Your expertise covers: -1. Analyzing industry trends, growth drivers, and potential disruptors -2. Evaluating competitive landscape and market positioning -3. Assessing market size, segmentation, and growth potential -4. Analyzing customer dynamics, including concentration and loyalty -5. Identifying potential regulatory or macroeconomic impacts on the market -Deliver comprehensive market analysis to assess the attractiveness and risks of potential investments.""" - -OPERATIONAL_ANALYST_PROMPT = """You are an expert private equity agent focused on operational analysis. Your core competencies include: -1. Evaluating operational efficiency and identifying improvement opportunities -2. Analyzing supply chain and procurement processes -3. Assessing sales and marketing effectiveness -4. Evaluating IT systems and digital capabilities -5. Identifying potential synergies in merger or add-on acquisition scenarios -Provide detailed operational analysis to uncover value creation opportunities and potential risks.""" # Initialize specialized agents data_extractor_agent = Agent( agent_name="Data-Extractor", - system_prompt=DATA_EXTRACTOR_PROMPT, + system_prompt="You are a data extraction specialist. Extract relevant information from provided content.", llm=model, max_loops=1, autosave=True, @@ -75,7 +36,7 @@ data_extractor_agent = Agent( summarizer_agent = Agent( agent_name="Document-Summarizer", - system_prompt=SUMMARIZER_PROMPT, + system_prompt="You are a document summarization specialist. Provide clear and concise summaries.", llm=model, max_loops=1, autosave=True, @@ -90,7 +51,7 @@ summarizer_agent = Agent( financial_analyst_agent = Agent( agent_name="Financial-Analyst", - system_prompt=FINANCIAL_ANALYST_PROMPT, + system_prompt="You are a financial analysis specialist. Analyze financial aspects of content.", llm=model, max_loops=1, autosave=True, @@ -105,7 +66,7 @@ financial_analyst_agent = Agent( market_analyst_agent = Agent( agent_name="Market-Analyst", - system_prompt=MARKET_ANALYST_PROMPT, + system_prompt="You are a market analysis specialist. Analyze market-related aspects.", llm=model, max_loops=1, autosave=True, @@ -120,7 +81,7 @@ market_analyst_agent = Agent( operational_analyst_agent = Agent( agent_name="Operational-Analyst", - system_prompt=OPERATIONAL_ANALYST_PROMPT, + system_prompt="You are an operational analysis specialist. Analyze operational aspects.", llm=model, max_loops=1, autosave=True, @@ -141,12 +102,14 @@ router = SwarmRouter( agents=[ data_extractor_agent, summarizer_agent, - # financial_analyst_agent, - # market_analyst_agent, - # operational_analyst_agent, + financial_analyst_agent, + market_analyst_agent, + operational_analyst_agent, ], - swarm_type="auto", # or "SequentialWorkflow" or "ConcurrentWorkflow" or - # auto_generate_prompts=True, + swarm_type="SequentialWorkflow", # or "SequentialWorkflow" or "ConcurrentWorkflow" or + auto_generate_prompts=True, + output_type="all", + ) # Example usage @@ -156,7 +119,3 @@ if __name__ == "__main__": "Where is the best place to find template term sheets for series A startups. Provide links and references" ) print(result) - - # Retrieve and print logs - for log in router.get_logs(): - print(f"{log.timestamp} - {log.level}: {log.message}") diff --git a/concurrent_mix.py b/concurrent_mix.py new file mode 100644 index 00000000..5ac80ede --- /dev/null +++ b/concurrent_mix.py @@ -0,0 +1,99 @@ +import os + +from swarm_models import OpenAIChat + +from swarms import Agent, run_agents_with_tasks_concurrently + +# Fetch the OpenAI API key from the environment variable +api_key = os.getenv("OPENAI_API_KEY") + +# Create an instance of the OpenAIChat class +model = OpenAIChat( + openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1 +) + +# Initialize agents for different roles +delaware_ccorp_agent = Agent( + agent_name="Delaware-CCorp-Hiring-Agent", + system_prompt=""" + Create a comprehensive hiring description for a Delaware C Corporation, + including all relevant laws and regulations, such as the Delaware General + Corporation Law (DGCL) and the Delaware Corporate Law. Ensure the description + covers the requirements for hiring employees, contractors, and officers, + including the necessary paperwork, tax obligations, and benefits. Also, + outline the procedures for compliance with Delaware's employment laws, + including anti-discrimination laws, workers' compensation, and unemployment + insurance. Provide guidance on how to navigate the complexities of Delaware's + corporate law and ensure that all hiring practices are in compliance with + state and federal regulations. + """, + llm=model, + max_loops=1, + autosave=False, + dashboard=False, + verbose=True, + output_type="str", + artifacts_on=True, + artifacts_output_path="delaware_ccorp_hiring_description.md", + artifacts_file_extension=".md", +) + +indian_foreign_agent = Agent( + agent_name="Indian-Foreign-Hiring-Agent", + system_prompt=""" + Create a comprehensive hiring description for an Indian or foreign country, + including all relevant laws and regulations, such as the Indian Contract Act, + the Indian Labour Laws, and the Foreign Exchange Management Act (FEMA). + Ensure the description covers the requirements for hiring employees, + contractors, and officers, including the necessary paperwork, tax obligations, + and benefits. Also, outline the procedures for compliance with Indian and + foreign employment laws, including anti-discrimination laws, workers' + compensation, and unemployment insurance. Provide guidance on how to navigate + the complexities of Indian and foreign corporate law and ensure that all hiring + practices are in compliance with state and federal regulations. Consider the + implications of hiring foreign nationals and the requirements for obtaining + necessary visas and work permits. + """, + llm=model, + max_loops=1, + autosave=False, + dashboard=False, + verbose=True, + output_type="str", + artifacts_on=True, + artifacts_output_path="indian_foreign_hiring_description.md", + artifacts_file_extension=".md", +) + +# List of agents and corresponding tasks +agents = [delaware_ccorp_agent, indian_foreign_agent] +tasks = [ + """ + Create a comprehensive hiring description for an Agent Engineer, including + required skills and responsibilities. Ensure the description covers the + necessary technical expertise, such as proficiency in AI/ML frameworks, + programming languages, and data structures. Outline the key responsibilities, + including designing and developing AI agents, integrating with existing systems, + and ensuring scalability and performance. + """, + """ + Generate a detailed job description for a Prompt Engineer, including + required skills and responsibilities. Ensure the description covers the + necessary technical expertise, such as proficiency in natural language processing, + machine learning, and software development. Outline the key responsibilities, + including designing and optimizing prompts for AI systems, ensuring prompt + quality and consistency, and collaborating with cross-functional teams. + """, +] + +# Run agents with tasks concurrently +results = run_agents_with_tasks_concurrently( + agents, + tasks, + all_cores=True, + device="cpu", +) + +# Print the results +for result in results: + print(result) diff --git a/6_0_0.md b/docs/swarms/changelog/6_0_0.md similarity index 100% rename from 6_0_0.md rename to docs/swarms/changelog/6_0_0.md diff --git a/example.py b/example.py index 8f6f22da..8165b767 100644 --- a/example.py +++ b/example.py @@ -43,8 +43,7 @@ agent = Agent( ) -print( - agent.run( - "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria. Create a report on this question." - ) +agent.run( + "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria. Create a report on this question.", + all_cores=True, ) diff --git a/pyproject.toml b/pyproject.toml index 24cd0922..741b86ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "6.0.4" +version = "6.0.9" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] @@ -78,6 +78,7 @@ swarm-models = "*" clusterops = "*" chromadb = "*" reportlab = "*" +doc-master = "*" [tool.poetry.scripts] swarms = "swarms.cli.main:main" diff --git a/agent_with_rag.py b/rag_examples/agent_with_rag.py similarity index 100% rename from agent_with_rag.py rename to rag_examples/agent_with_rag.py diff --git a/agent_with_rag_and_tools.py b/rag_examples/agent_with_rag_and_tools.py similarity index 100% rename from agent_with_rag_and_tools.py rename to rag_examples/agent_with_rag_and_tools.py diff --git a/rearrange_test.py b/rearrange_test.py new file mode 100644 index 00000000..ddfd7670 --- /dev/null +++ b/rearrange_test.py @@ -0,0 +1,119 @@ +import os + +from swarms import Agent, AgentRearrange + +from swarm_models import OpenAIChat + +# Get the OpenAI API key from the environment variable +api_key = os.getenv("OPENAI_API_KEY") + +# Create an instance of the OpenAIChat class +model = OpenAIChat( + api_key=api_key, model_name="gpt-4o-mini", temperature=0.1 +) + + +# Initialize the boss agent (Director) +boss_agent = Agent( + agent_name="BossAgent", + system_prompt=""" + You are the BossAgent responsible for managing and overseeing a swarm of agents analyzing company expenses. + Your job is to dynamically assign tasks, prioritize their execution, and ensure that all agents collaborate efficiently. + After receiving a report on the company's expenses, you will break down the work into smaller tasks, + assigning specific tasks to each agent, such as detecting recurring high costs, categorizing expenditures, + and identifying unnecessary transactions. Ensure the results are communicated back in a structured way + so the finance team can take actionable steps to cut off unproductive spending. You also monitor and + dynamically adapt the swarm to optimize their performance. Finally, you summarize their findings + into a coherent report. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="boss_agent.json", +) + +# Initialize worker 1: Expense Analyzer +worker1 = Agent( + agent_name="ExpenseAnalyzer", + system_prompt=""" + Your task is to carefully analyze the company's expense data provided to you. + You will focus on identifying high-cost recurring transactions, categorizing expenditures + (e.g., marketing, operations, utilities, etc.), and flagging areas where there seems to be excessive spending. + You will provide a detailed breakdown of each category, along with specific recommendations for cost-cutting. + Pay close attention to monthly recurring subscriptions, office supplies, and non-essential expenditures. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="worker1.json", +) + +# Initialize worker 2: Summary Generator +worker2 = Agent( + agent_name="SummaryGenerator", + system_prompt=""" + After receiving the detailed breakdown from the ExpenseAnalyzer, + your task is to create a concise summary of the findings. You will focus on the most actionable insights, + such as highlighting the specific transactions that can be immediately cut off and summarizing the areas + where the company is overspending. Your summary will be used by the BossAgent to generate the final report. + Be clear and to the point, emphasizing the urgency of cutting unnecessary expenses. + """, + llm=model, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + stopping_token="", + state_save_file_type="json", + saved_state_path="worker2.json", +) + +# Swarm-Level Prompt (Collaboration Prompt) +swarm_prompt = """ + As a swarm, your collective goal is to analyze the company's expenses and identify transactions that should be cut off. + You will work collaboratively to break down the entire process of expense analysis into manageable steps. + The BossAgent will direct the flow and assign tasks dynamically to the agents. The ExpenseAnalyzer will first + focus on breaking down the expense report, identifying high-cost recurring transactions, categorizing them, + and providing recommendations for potential cost reduction. After the analysis, the SummaryGenerator will then + consolidate all the findings into an actionable summary that the finance team can use to immediately cut off unnecessary expenses. + Together, your collaboration is essential to streamlining and improving the company’s financial health. +""" + +# Create a list of agents +agents = [boss_agent, worker1, worker2] + +# Define the flow pattern for the swarm +flow = "BossAgent -> ExpenseAnalyzer -> SummaryGenerator" + +# Using AgentRearrange class to manage the swarm +agent_system = AgentRearrange( + agents=agents, + flow=flow, + return_json=False, + output_type="final", + max_loops=1, + docs=["SECURITY.md"], +) + +# Input task for the swarm +task = f""" + + {swarm_prompt} + + The company has been facing a rising number of unnecessary expenses, and the finance team needs a detailed + analysis of recent transactions to identify which expenses can be cut off to improve profitability. + Analyze the provided transaction data and create a detailed report on cost-cutting opportunities, + focusing on recurring transactions and non-essential expenditures. +""" + +# Run the swarm system with the task +output = agent_system.run(task) +print(output) diff --git a/requirements.txt b/requirements.txt index 8f9df9b9..092dce58 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,3 +35,4 @@ aiofiles swarm-models clusterops reportlab +doc-master diff --git a/sequential_worflow_test.py b/sequential_worflow_test.py new file mode 100644 index 00000000..654154c6 --- /dev/null +++ b/sequential_worflow_test.py @@ -0,0 +1,117 @@ +import os +from dotenv import load_dotenv +from swarms import Agent, SequentialWorkflow +from swarm_models import OpenAIChat + +load_dotenv() + +# Get the OpenAI API key from the environment variable +api_key = os.getenv("GROQ_API_KEY") + +# Model +model = OpenAIChat( + openai_api_base="https://api.groq.com/openai/v1", + openai_api_key=api_key, + model_name="llama-3.1-70b-versatile", + temperature=0.1, +) + + +# Initialize specialized agents +data_extractor_agent = Agent( + agent_name="Data-Extractor", + system_prompt=None, + llm=model, + max_loops=1, + autosave=True, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="data_extractor_agent.json", + user_name="pe_firm", + retry_attempts=1, + context_length=200000, + output_type="string", +) + +summarizer_agent = Agent( + agent_name="Document-Summarizer", + system_prompt=None, + llm=model, + max_loops=1, + autosave=True, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="summarizer_agent.json", + user_name="pe_firm", + retry_attempts=1, + context_length=200000, + output_type="string", +) + +financial_analyst_agent = Agent( + agent_name="Financial-Analyst", + system_prompt=None, + llm=model, + max_loops=1, + autosave=True, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="financial_analyst_agent.json", + user_name="pe_firm", + retry_attempts=1, + context_length=200000, + output_type="string", +) + +market_analyst_agent = Agent( + agent_name="Market-Analyst", + system_prompt=None, + llm=model, + max_loops=1, + autosave=True, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="market_analyst_agent.json", + user_name="pe_firm", + retry_attempts=1, + context_length=200000, + output_type="string", +) + +operational_analyst_agent = Agent( + agent_name="Operational-Analyst", + system_prompt=None, + llm=model, + max_loops=1, + autosave=True, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path="operational_analyst_agent.json", + user_name="pe_firm", + retry_attempts=1, + context_length=200000, + output_type="string", +) + +# Initialize the SwarmRouter +router = SequentialWorkflow( + name="pe-document-analysis-swarm", + description="Analyze documents for private equity due diligence and investment decision-making", + max_loops=1, + agents=[ + data_extractor_agent, + summarizer_agent, + financial_analyst_agent, + market_analyst_agent, + operational_analyst_agent, + ], + output_type="all", +) + +# Example usage +if __name__ == "__main__": + # Run a comprehensive private equity document analysis task + result = router.run( + "Where is the best place to find template term sheets for series A startups. Provide links and references" + ) + print(result) diff --git a/swarms/agents/create_agents_from_yaml.py b/swarms/agents/create_agents_from_yaml.py index 85371283..ef1e8f18 100644 --- a/swarms/agents/create_agents_from_yaml.py +++ b/swarms/agents/create_agents_from_yaml.py @@ -115,6 +115,15 @@ def create_agents_from_yaml( auto_generate_prompt=agent_config.get( "auto_generate_prompt", "False" ), + artifacts_on=agent_config.get( + "artifacts_on", "False" + ), + artifacts_file_extension=agent_config.get( + "artifacts_file_extension", ".md" + ), + artifacts_output_path=agent_config.get( + "artifacts_output_path", "" + ), *args, **kwargs, ) @@ -138,7 +147,7 @@ def create_agents_from_yaml( flow=swarm_config.get("flow"), autosave=swarm_config.get("autosave"), return_json=swarm_config.get("return_json"), - *args, + rules=swarm_config.get("rules", "") * args, **kwargs, ) logger.info( diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index a9b77dae..05b74bc6 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -74,7 +74,9 @@ from swarms.structs.multi_agent_exec import ( run_agents_with_different_tasks, run_agent_with_timeout, run_agents_with_resource_monitoring, + run_agents_with_tasks_concurrently, ) +from swarms.structs.agents_available import showcase_available_agents __all__ = [ "Agent", @@ -143,4 +145,6 @@ __all__ = [ "run_agent_with_timeout", "run_agents_with_resource_monitoring", "swarm_router", + "run_agents_with_tasks_concurrently", + "showcase_available_agents", ] diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 71ce06c3..696193be 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -299,7 +299,6 @@ class Agent: rules: str = None, # type: ignore planning: Optional[str] = False, planning_prompt: Optional[str] = None, - device: str = None, custom_planning_prompt: str = None, memory_chunk_size: int = 2000, agent_ops_on: bool = False, @@ -331,6 +330,9 @@ class Agent: artifacts_on: bool = False, artifacts_output_path: str = None, artifacts_file_extension: str = None, + device: str = "cpu", + all_cores: bool = True, + device_id: int = 0, *args, **kwargs, ): @@ -408,7 +410,6 @@ class Agent: self.execute_tool = execute_tool self.planning = planning self.planning_prompt = planning_prompt - self.device = device self.custom_planning_prompt = custom_planning_prompt self.rules = rules self.custom_tools_prompt = custom_tools_prompt @@ -441,6 +442,9 @@ class Agent: self.artifacts_on = artifacts_on self.artifacts_output_path = artifacts_output_path self.artifacts_file_extension = artifacts_file_extension + self.device = device + self.all_cores = all_cores + self.device_id = device_id # Initialize the short term memory self.short_memory = Conversation( @@ -729,16 +733,18 @@ class Agent: # Check parameters def check_parameters(self): if self.llm is None: - raise ValueError("Language model is not provided") + raise ValueError("Language model is not provided. Choose a model from the available models in swarm_models or create a class with a run(task: str) method and or a __call__ method.") - if self.max_loops is None: + if self.max_loops is None or self.max_loops == 0: raise ValueError("Max loops is not provided") - if self.max_tokens == 0: + if self.max_tokens == 0 or self.max_tokens is None: raise ValueError("Max tokens is not provided") - if self.context_length == 0: + if self.context_length == 0 or self.context_length is None: raise ValueError("Context length is not provided") + + # Main function def _run( @@ -2264,6 +2270,9 @@ class Agent: ValueError: If an invalid device is specified. Exception: If any other error occurs during execution. """ + device = device or self.device + device_id = device_id or self.device_id + try: logger.info(f"Attempting to run on device: {device}") if device == "cpu": diff --git a/swarms/structs/agents_available.py b/swarms/structs/agents_available.py new file mode 100644 index 00000000..0ed63c5a --- /dev/null +++ b/swarms/structs/agents_available.py @@ -0,0 +1,93 @@ +from typing import List, Any +from loguru import logger +from swarms.structs.agent import Agent + + +def get_agent_name(agent: Any) -> str: + """Helper function to safely get agent name + + Args: + agent (Any): The agent object to get name from + + Returns: + str: The agent's name if found, 'Unknown' otherwise + """ + if isinstance(agent, Agent) and hasattr(agent, "agent_name"): + return agent.agent_name + return "Unknown" + + +def get_agent_description(agent: Any) -> str: + """Helper function to get agent description or system prompt preview + + Args: + agent (Any): The agent object + + Returns: + str: Description or first 100 chars of system prompt + """ + if not isinstance(agent, Agent): + return "N/A" + + if hasattr(agent, "description") and agent.description: + return agent.description + + if hasattr(agent, "system_prompt") and agent.system_prompt: + return f"{agent.system_prompt[:150]}..." + + return "N/A" + + +def showcase_available_agents( + name: str = None, + description: str = None, + agents: List[Agent] = [], + update_agents_on: bool = False, +) -> str: + """ + Generate a formatted string showcasing all available agents and their descriptions. + + Args: + agents (List[Agent]): List of Agent objects to showcase. + update_agents_on (bool, optional): If True, updates each agent's system prompt with + the showcase information. Defaults to False. + + Returns: + str: Formatted string containing agent information, including names, descriptions + and IDs for all available agents. + """ + logger.info(f"Showcasing {len(agents)} available agents") + + formatted_agents = [] + header = f"\n####### Agents available in the swarm: {name} ############\n" + header += f"{description}\n" + row_format = "{:<5} | {:<20} | {:<50}" + header_row = row_format.format("ID", "Agent Name", "Description") + separator = "-" * 80 + + formatted_agents.append(header) + formatted_agents.append(separator) + formatted_agents.append(header_row) + formatted_agents.append(separator) + + for idx, agent in enumerate(agents): + if not isinstance(agent, Agent): + logger.warning( + f"Skipping non-Agent object: {type(agent)}" + ) + continue + + agent_name = get_agent_name(agent) + description = ( + get_agent_description(agent)[:100] + "..." + if len(get_agent_description(agent)) > 100 + else get_agent_description(agent) + ) + + formatted_agents.append( + row_format.format(idx + 1, agent_name, description) + ) + + showcase = "\n".join(formatted_agents) + + return showcase diff --git a/auto_swarm_builder.py b/swarms/structs/auto_swarm_builder.py similarity index 100% rename from auto_swarm_builder.py rename to swarms/structs/auto_swarm_builder.py diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index e32a4edc..c95b7c7a 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -5,10 +5,13 @@ from dataclasses import dataclass import threading from typing import List, Union, Any, Callable from multiprocessing import cpu_count - +import os from swarms.structs.agent import Agent from swarms.utils.calculate_func_metrics import profile_func +from swarms.utils.wrapper_clusterop import ( + exec_callable_with_clusterops, +) # Type definitions @@ -332,6 +335,103 @@ def run_agents_with_resource_monitoring( # Implementation details... +@profile_func +def _run_agents_with_tasks_concurrently( + agents: List[AgentType], + tasks: List[str] = [], + batch_size: int = None, + max_workers: int = None, +) -> List[Any]: + """ + Run multiple agents with corresponding tasks concurrently. + + Args: + agents: List of Agent instances to run + tasks: List of task strings to execute + batch_size: Number of agents to run in parallel + max_workers: Maximum number of threads + + Returns: + List of outputs from each agent + """ + if len(agents) != len(tasks): + raise ValueError( + "The number of agents must match the number of tasks." + ) + + cpu_cores = os.cpu_count() + batch_size = batch_size or cpu_cores + max_workers = max_workers or cpu_cores * 2 + results = [] + + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + async def run_agent_task_pair( + agent: AgentType, task: str, executor: ThreadPoolExecutor + ) -> Any: + return await run_agent_async(agent, task, executor) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + for i in range(0, len(agents), batch_size): + batch_agents = agents[i : i + batch_size] + batch_tasks = tasks[i : i + batch_size] + batch_results = loop.run_until_complete( + asyncio.gather( + *( + run_agent_task_pair(agent, task, executor) + for agent, task in zip( + batch_agents, batch_tasks + ) + ) + ) + ) + results.extend(batch_results) + + return results + + +def run_agents_with_tasks_concurrently( + agents: List[AgentType], + tasks: List[str] = [], + batch_size: int = None, + max_workers: int = None, + device: str = "cpu", + device_id: int = 0, + all_cores: bool = True, +) -> List[Any]: + """ + Executes a list of agents with their corresponding tasks concurrently on a specified device. + + This function orchestrates the concurrent execution of a list of agents with their respective tasks on a specified device, either CPU or GPU. It leverages the `exec_callable_with_clusterops` function to manage the execution on the specified device. + + Args: + agents (List[AgentType]): A list of Agent instances or callable functions to execute concurrently. + tasks (List[str], optional): A list of task strings to execute for each agent. Defaults to an empty list. + batch_size (int, optional): The number of agents to run in parallel. Defaults to None. + max_workers (int, optional): The maximum number of threads to use for execution. Defaults to None. + device (str, optional): The device to use for execution. Defaults to "cpu". + device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0. + all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True. + + Returns: + List[Any]: A list of outputs from each agent execution. + """ + return exec_callable_with_clusterops( + device, + device_id, + all_cores, + _run_agents_with_tasks_concurrently, + agents, + tasks, + batch_size, + max_workers, + ) + + # # Example usage: # # Initialize your agents with the same model to avoid re-creating it # agents = [ diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index 225eeb98..01c0f7b5 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -1,39 +1,52 @@ -import threading +import traceback +import asyncio import uuid +from concurrent.futures import ThreadPoolExecutor from datetime import datetime -from typing import Callable, Dict, List, Optional +from typing import Callable, Dict, List, Literal, Optional from pydantic import BaseModel, Field from swarms_memory import BaseVectorDatabase from swarms.schemas.agent_step_schemas import ManySteps from swarms.structs.agent import Agent +from swarms.structs.agents_available import showcase_available_agents from swarms.structs.base_swarm import BaseSwarm -from swarms.structs.omni_agent_types import AgentType +from swarms.utils.add_docs_to_agents import handle_input_docs from swarms.utils.loguru_logger import logger +from swarms.utils.wrapper_clusterop import ( + exec_callable_with_clusterops, +) +from swarms.utils.swarm_reliability_checks import reliability_check + +# Literal of output types +OutputType = Literal[ + "all", "final", "list", "dict", ".json", ".md", ".txt", ".yaml", ".toml" +] + + +def swarm_id(): + return uuid.uuid4().hex class AgentRearrangeInput(BaseModel): - swarm_id: str - name: str - description: str - flow: str - max_loops: int + swarm_id: Optional[str] = None + name: Optional[str] = None + description: Optional[str] = None + flow: Optional[str] = None + max_loops: Optional[int] = None time: str = Field( default_factory=lambda: datetime.now().strftime( "%Y-%m-%d %H:%M:%S" ), description="The time the agent was created.", ) - - -def swarm_id(): - return uuid.uuid4().hex + output_type: OutputType = Field(default="final") class AgentRearrangeOutput(BaseModel): - Input: AgentRearrangeInput - outputs: List[ManySteps] + Input: Optional[AgentRearrangeInput] = None + outputs: Optional[List[ManySteps]] = None time: str = Field( default_factory=lambda: datetime.now().strftime( "%Y-%m-%d %H:%M:%S" @@ -47,16 +60,38 @@ class AgentRearrange(BaseSwarm): A class representing a swarm of agents for rearranging tasks. Attributes: - agents (dict): A dictionary of agents, where the key is the agent's name and the value is the agent object. - flow (str): The flow pattern of the tasks. + id (str): Unique identifier for the swarm + name (str): Name of the swarm + description (str): Description of the swarm's purpose + agents (callable): Dictionary mapping agent names to Agent objects + flow (str): The flow pattern defining task execution order + max_loops (int): Maximum number of execution loops + verbose (bool): Whether to enable verbose logging + memory_system (BaseVectorDatabase): Memory system for storing agent interactions + human_in_the_loop (bool): Whether human intervention is enabled + custom_human_in_the_loop (Callable): Custom function for human intervention + return_json (bool): Whether to return output in JSON format + output_type (OutputType): Format of output ("all", "final", "list", or "dict") + swarm_history (dict): History of agent interactions + input_config (AgentRearrangeInput): Input configuration schema + output_schema (AgentRearrangeOutput): Output schema Methods: - __init__(agents: List[Agent] = None, flow: str = None): Initializes the AgentRearrange object. - add_agent(agent: Agent): Adds an agent to the swarm. - remove_agent(agent_name: str): Removes an agent from the swarm. - add_agents(agents: List[Agent]): Adds multiple agents to the swarm. - validate_flow(): Validates the flow pattern. - run(task): Runs the swarm to rearrange the tasks. + __init__(): Initializes the AgentRearrange object + reliability_checks(): Validates swarm configuration + set_custom_flow(): Sets a custom flow pattern + add_agent(): Adds an agent to the swarm + track_history(): Records agent interaction history + remove_agent(): Removes an agent from the swarm + add_agents(): Adds multiple agents to the swarm + validate_flow(): Validates the flow pattern + run(): Executes the swarm's task processing + astream(): Runs the swarm with streaming output + batch_run(): Processes multiple tasks in batches + abatch_run(): Asynchronously processes multiple tasks in batches + concurrent_run(): Processes multiple tasks concurrently + handle_input_docs(): Adds document content to agent prompts + """ def __init__( @@ -64,7 +99,7 @@ class AgentRearrange(BaseSwarm): id: str = swarm_id(), name: str = "AgentRearrange", description: str = "A swarm of agents for rearranging tasks.", - agents: List[AgentType] = None, + agents: List[Agent] = None, flow: str = None, max_loops: int = 1, verbose: bool = True, @@ -74,25 +109,28 @@ class AgentRearrange(BaseSwarm): Callable[[str], str] ] = None, return_json: bool = False, + output_type: OutputType = "final", + docs: List[str] = None, + doc_folder: str = None, *args, **kwargs, ): - """ - Initializes the AgentRearrange object. - - Args: - agents (List[Agent], optional): A list of Agent objects. Defaults to None. - flow (str, optional): The flow pattern of the tasks. Defaults to None. - """ + # reliability_check( + # agents=agents, + # name=name, + # description=description, + # flow=flow, + # max_loops=max_loops, + # ) super(AgentRearrange, self).__init__( name=name, description=description, - agents=agents, + agents=agents if agents else [], *args, **kwargs, ) self.id = id - self.agents = {agent.name: agent for agent in agents} + self.agents = {agent.agent_name: agent for agent in agents} self.flow = flow if flow is not None else "" self.verbose = verbose self.max_loops = max_loops if max_loops > 0 else 1 @@ -100,14 +138,14 @@ class AgentRearrange(BaseSwarm): self.human_in_the_loop = human_in_the_loop self.custom_human_in_the_loop = custom_human_in_the_loop self.return_json = return_json + self.output_type = output_type + self.docs = docs + self.doc_folder = doc_folder self.swarm_history = { agent.agent_name: [] for agent in agents } - self.lock = threading.Lock() - self.id = uuid.uuid4().hex if id is None else id - # Run the relianility checks - self.reliability_checks() + self.id = uuid.uuid4().hex if id is None else id # Output schema self.input_config = AgentRearrangeInput( @@ -116,6 +154,7 @@ class AgentRearrange(BaseSwarm): description=self.description, flow=self.flow, max_loops=self.max_loops, + output_type=self.output_type, ) # Output schema @@ -124,31 +163,36 @@ class AgentRearrange(BaseSwarm): outputs=[], ) - def reliability_checks(self): - logger.info("Running reliability checks.") - if self.agents is None: - raise ValueError("No agents found in the swarm.") - - if self.flow is None: - raise ValueError("No flow found in the swarm.") + # Run the reliability checks to validate the swarm + # self.handle_input_docs() - if self.max_loops is None: - raise ValueError("No max_loops found in the swarm.") + # Show the agents whose in the swarm + self.showcase_agents() - logger.info( - "AgentRearrange initialized with agents: {}".format( - list(self.agents.keys()) - ) + def showcase_agents(self): + # Get formatted agent info once + agents_available = showcase_available_agents( + name=self.name, + description=self.description, + agents=self.agents, ) - # Verbose is True - if self.verbose is True: - logger.add("agent_rearrange.log") + # Update all agents in one pass using values() + for agent in self.agents.values(): + if isinstance(agent, Agent): + agent.system_prompt += agents_available def set_custom_flow(self, flow: str): self.flow = flow logger.info(f"Custom flow set: {flow}") + def handle_input_docs(self): + self.agents = handle_input_docs( + agents=self.agents, + docs=self.docs, + doc_folder=self.doc_folder, + ) + def add_agent(self, agent: Agent): """ Adds an agent to the swarm. @@ -156,8 +200,8 @@ class AgentRearrange(BaseSwarm): Args: agent (Agent): The agent to be added. """ - logger.info(f"Adding agent {agent.name} to the swarm.") - self.agents[agent.name] = agent + logger.info(f"Adding agent {agent.agent_name} to the swarm.") + self.agents[agent.agent_name] = agent def track_history( self, @@ -183,7 +227,7 @@ class AgentRearrange(BaseSwarm): agents (List[Agent]): A list of Agent objects. """ for agent in agents: - self.agents[agent.name] = agent + self.agents[agent.agent_name] = agent def validate_flow(self): """ @@ -226,10 +270,10 @@ class AgentRearrange(BaseSwarm): "Duplicate agent names in the flow are not allowed." ) - print("Flow is valid.") + logger.info(f"Flow: {self.flow} is valid.") return True - def run( + def _run( self, task: str = None, img: str = None, @@ -241,51 +285,73 @@ class AgentRearrange(BaseSwarm): Runs the swarm to rearrange the tasks. Args: - task: The initial task to be processed. + task (str, optional): The initial task to be processed. Defaults to None. + img (str, optional): Image input for agents that support it. Defaults to None. + custom_tasks (Dict[str, str], optional): Custom tasks for specific agents. Defaults to None. + output_type (str, optional): Format of the output. Can be: + - "all": String containing all agent responses concatenated + - "final": Only the final agent's response + - "list": List of all agent responses + - "dict": Dict mapping agent names to their responses + Defaults to "final". + *args: Additional positional arguments + **kwargs: Additional keyword arguments Returns: - str: The final processed task. + Union[str, List[str], Dict[str, str]]: The processed output in the specified format + + Raises: + ValueError: If flow validation fails + Exception: For any other errors during execution """ try: if not self.validate_flow(): + logger.error("Flow validation failed") return "Invalid flow configuration." tasks = self.flow.split("->") current_task = task + all_responses = [] + response_dict = {} + + logger.info( + f"Starting task execution with {len(tasks)} steps" + ) - # If custom_tasks have the agents name and tasks then combine them + # Handle custom tasks if custom_tasks is not None: + logger.info("Processing custom tasks") c_agent_name, c_task = next( iter(custom_tasks.items()) ) - - # Find the position of the custom agent in the tasks list position = tasks.index(c_agent_name) - # If there is a prebois agent merge its task with the custom tasks if position > 0: tasks[position - 1] += "->" + c_task else: - # If there is no prevous agent just insert the custom tasks tasks.insert(position, c_task) - # Set the loop counter loop_count = 0 while loop_count < self.max_loops: + logger.info( + f"Starting loop {loop_count + 1}/{self.max_loops}" + ) + for task in tasks: is_last = task == tasks[-1] agent_names = [ name.strip() for name in task.split(",") ] + if len(agent_names) > 1: # Parallel processing logger.info( f"Running agents in parallel: {agent_names}" ) results = [] + for agent_name in agent_names: if agent_name == "H": - # Human in the loop intervention if ( self.human_in_the_loop and self.custom_human_in_the_loop @@ -299,29 +365,39 @@ class AgentRearrange(BaseSwarm): current_task = input( "Enter your response:" ) + results.append(current_task) + response_dict[agent_name] = ( + current_task + ) else: agent = self.agents[agent_name] result = agent.run( - current_task, - img, - is_last, + task=current_task, + img=img, + is_last=is_last, *args, **kwargs, ) results.append(result) + response_dict[agent_name] = result self.output_schema.outputs.append( agent.agent_output ) + logger.debug( + f"Agent {agent_name} output: {result}" + ) current_task = "; ".join(results) + all_responses.extend(results) + else: # Sequential processing logger.info( - f"Running agents sequentially: {agent_names}" + f"Running agent sequentially: {agent_names[0]}" ) agent_name = agent_names[0] + if agent_name == "H": - # Human-in-the-loop intervention if ( self.human_in_the_loop and self.custom_human_in_the_loop @@ -335,214 +411,238 @@ class AgentRearrange(BaseSwarm): current_task = input( "Enter the next task: " ) + response_dict[agent_name] = current_task else: agent = self.agents[agent_name] current_task = agent.run( - current_task, - img, - is_last, + task=current_task, + img=img, + is_last=is_last, *args, **kwargs, ) + response_dict[agent_name] = current_task self.output_schema.outputs.append( agent.agent_output ) + logger.debug( + f"Agent {agent_name} output: {current_task}" + ) + + all_responses.append(current_task) + loop_count += 1 - # return current_task + logger.info("Task execution completed") + if self.return_json: return self.output_schema.model_dump_json(indent=4) - else: - return current_task + + # Handle different output types + if self.output_type == "all": + output = " ".join(all_responses) + elif self.output_type == "list": + output = all_responses + elif self.output_type == "dict": + output = response_dict + else: # "final" + output = current_task + + return output except Exception as e: - logger.error(f"An error occurred: {e}") + logger.error(f"An error occurred: {e} \n {traceback.format_exc()}") return e - async def astream( + def run( self, task: str = None, img: str = None, - custom_tasks: Dict[str, str] = None, + device: str = "cpu", + device_id: int = 1, + all_cores: bool = True, + all_gpus: bool = False, *args, **kwargs, ): """ - Runs the swarm with LangChain's astream_events v1 API enabled. - NOTICE: Be sure to only call this method if you are using LangChain-based models in your swarm. - This is useful for enhancing user experience by providing real-time updates of how each agent - in the swarm is processing the current task. + Execute the agent rearrangement task with specified compute resources. Args: - task: The initial prompt (aka task) passed to the first agent(s) in the swarm. + task (str, optional): The task to execute. Defaults to None. + img (str, optional): Path to input image if required. Defaults to None. + device (str, optional): Computing device to use ('cpu' or 'gpu'). Defaults to "cpu". + device_id (int, optional): ID of specific device to use. Defaults to 1. + all_cores (bool, optional): Whether to use all CPU cores. Defaults to True. + all_gpus (bool, optional): Whether to use all available GPUs. Defaults to False. + *args: Additional positional arguments passed to _run(). + **kwargs: Additional keyword arguments passed to _run(). Returns: - str: The final output generated. + The result from executing the task through the cluster operations wrapper. """ - try: - if not self.validate_flow(): - return "Invalid flow configuration." - - tasks = self.flow.split("->") - current_task = task + return exec_callable_with_clusterops( + device=device, + device_id=device_id, + all_cores=all_cores, + all_gpus=all_gpus, + func=self._run, + task=task, + img=img, + *args, + **kwargs, + ) + + def __call__(self, task: str, *args, **kwargs): + """ + Make the class callable by executing the run() method. - # If custom_tasks have the agents name and tasks then combine them - if custom_tasks is not None: - c_agent_name, c_task = next( - iter(custom_tasks.items()) - ) + Args: + task (str): The task to execute. + *args: Additional positional arguments passed to run(). + **kwargs: Additional keyword arguments passed to run(). - # Find the position of the custom agent in the tasks list - position = tasks.index(c_agent_name) + Returns: + The result from executing run(). + """ + return self.run(task=task, *args, **kwargs) - # If there is a prebois agent merge its task with the custom tasks - if position > 0: - tasks[position - 1] += "->" + c_task - else: - # If there is no prevous agent just insert the custom tasks - tasks.insert(position, c_task) + def batch_run( + self, + tasks: List[str], + img: Optional[List[str]] = None, + batch_size: int = 10, + device: str = "cpu", + device_id: int = None, + all_cores: bool = True, + all_gpus: bool = False, + *args, + **kwargs, + ) -> List[str]: + """ + Process multiple tasks in batches. - logger.info("TASK:", task) + Args: + tasks: List of tasks to process + img: Optional list of images corresponding to tasks + batch_size: Number of tasks to process simultaneously + device: Computing device to use + device_id: Specific device ID if applicable + all_cores: Whether to use all CPU cores + all_gpus: Whether to use all available GPUs - # Set the loop counter - loop_count = 0 - while loop_count < self.max_loops: - for task in tasks: - agent_names = [ - name.strip() for name in task.split(",") - ] - if len(agent_names) > 1: - # Parallel processing - logger.info( - f"Running agents in parallel: {agent_names}" - ) - results = [] - for agent_name in agent_names: - if agent_name == "H": - # Human in the loop intervention - if ( - self.human_in_the_loop - and self.custom_human_in_the_loop - ): - current_task = ( - self.custom_human_in_the_loop( - current_task - ) - ) - else: - current_task = input( - "Enter your response:" - ) - else: - agent = self.agents[agent_name] - result = None - # As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API - # Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs - # https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference - # Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs - # https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/ - async for evt in agent.astream_events( - current_task, version="v1" - ): - # print(evt) # <- useful when building/debugging - if evt["event"] == "on_llm_end": - result = evt["data"]["output"] - print(agent.name, result) - results.append(result) + Returns: + List of results corresponding to input tasks + """ + results = [] + for i in range(0, len(tasks), batch_size): + batch_tasks = tasks[i : i + batch_size] + batch_imgs = ( + img[i : i + batch_size] + if img + else [None] * len(batch_tasks) + ) - current_task = "" - for index, res in enumerate(results): - current_task += ( - "# OUTPUT of " - + agent_names[index] - + "" - + res - + "\n\n" - ) - else: - # Sequential processing - logger.info( - f"Running agents sequentially: {agent_names}" - ) + # Process batch using concurrent execution + batch_results = [ + self.run( + task=task, + img=img_path, + device=device, + device_id=device_id, + all_cores=all_cores, + all_gpus=all_gpus, + *args, + **kwargs, + ) + for task, img_path in zip(batch_tasks, batch_imgs) + ] + results.extend(batch_results) - agent_name = agent_names[0] - if agent_name == "H": - # Human-in-the-loop intervention - if ( - self.human_in_the_loop - and self.custom_human_in_the_loop - ): - current_task = ( - self.custom_human_in_the_loop( - current_task - ) - ) - else: - current_task = input( - "Enter the next task: " - ) - else: - agent = self.agents[agent_name] - result = None - # As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API - # Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs - # https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference - # Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs - # https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/ - async for evt in agent.astream_events( - f"SYSTEM: {agent.system_prompt}\nINPUT:{current_task}", - version="v1", - ): - # print(evt) # <- useful when building/debugging - if evt["event"] == "on_llm_end": - result = evt["data"]["output"] - print( - agent.name, "result", result - ) - current_task = result + return results - loop_count += 1 + async def abatch_run( + self, + tasks: List[str], + img: Optional[List[str]] = None, + batch_size: int = 10, + *args, + **kwargs, + ) -> List[str]: + """ + Asynchronously process multiple tasks in batches. - return current_task - except Exception as e: - logger.error(f"An error occurred: {e}") - return e + Args: + tasks: List of tasks to process + img: Optional list of images corresponding to tasks + batch_size: Number of tasks to process simultaneously - def process_agent_or_swarm( - self, name: str, task: str, img: str, is_last, *args, **kwargs - ): + Returns: + List of results corresponding to input tasks """ + results = [] + for i in range(0, len(tasks), batch_size): + batch_tasks = tasks[i : i + batch_size] + batch_imgs = ( + img[i : i + batch_size] + if img + else [None] * len(batch_tasks) + ) + + # Process batch using asyncio.gather + batch_coros = [ + self.astream(task=task, img=img_path, *args, **kwargs) + for task, img_path in zip(batch_tasks, batch_imgs) + ] + batch_results = await asyncio.gather(*batch_coros) + results.extend(batch_results) - process_agent_or_swarm: Processes the agent or sub-swarm based on the given name. + return results + + def concurrent_run( + self, + tasks: List[str], + img: Optional[List[str]] = None, + max_workers: Optional[int] = None, + device: str = "cpu", + device_id: int = None, + all_cores: bool = True, + all_gpus: bool = False, + *args, + **kwargs, + ) -> List[str]: + """ + Process multiple tasks concurrently using ThreadPoolExecutor. Args: - name (str): The name of the agent or sub-swarm to process. - task (str): The task to be executed. - img (str): The image to be processed by the agents. - *args: Variable length argument list. - **kwargs: Arbitrary keyword arguments. + tasks: List of tasks to process + img: Optional list of images corresponding to tasks + max_workers: Maximum number of worker threads + device: Computing device to use + device_id: Specific device ID if applicable + all_cores: Whether to use all CPU cores + all_gpus: Whether to use all available GPUs Returns: - str: The result of the last executed task. - + List of results corresponding to input tasks """ - if name.startswith("Human"): - return self.human_intervention(task) - elif name in self.sub_swarm: - return self.run_sub_swarm( - task, name, img, *args, **kwargs - ) - else: - agent = self.agents[name] - return agent.run(task, img, is_last, *args, **kwargs) - - def human_intervention(self, task: str) -> str: - if self.human_in_the_loop and self.custom_human_in_the_loop: - return self.custom_human_in_the_loop(task) - else: - return input( - "Human intervention required. Enter your response: " - ) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + imgs = img if img else [None] * len(tasks) + futures = [ + executor.submit( + self.run, + task=task, + img=img_path, + device=device, + device_id=device_id, + all_cores=all_cores, + all_gpus=all_gpus, + *args, + **kwargs, + ) + for task, img_path in zip(tasks, imgs) + ] + return [future.result() for future in futures] def rearrange( diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index cc4a1865..0b576df9 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -1,15 +1,18 @@ from typing import List from swarms.structs.agent import Agent from swarms.utils.loguru_logger import logger -from swarms.structs.rearrange import AgentRearrange -from swarms.structs.base_swarm import BaseSwarm +from swarms.structs.rearrange import AgentRearrange, OutputType +from concurrent.futures import ThreadPoolExecutor, as_completed +from swarms.structs.agents_available import showcase_available_agents -class SequentialWorkflow(BaseSwarm): +class SequentialWorkflow: """ - Initializes a SequentialWorkflow object. + Initializes a SequentialWorkflow object, which orchestrates the execution of a sequence of agents. Args: + name (str, optional): The name of the workflow. Defaults to "SequentialWorkflow". + description (str, optional): A description of the workflow. Defaults to "Sequential Workflow, where agents are executed in a sequence." agents (List[Agent], optional): The list of agents in the workflow. Defaults to None. max_loops (int, optional): The maximum number of loops to execute the workflow. Defaults to 1. *args: Variable length argument list. @@ -23,49 +26,170 @@ class SequentialWorkflow(BaseSwarm): self, name: str = "SequentialWorkflow", description: str = "Sequential Workflow, where agents are executed in a sequence.", - agents: List[Agent] = None, + agents: List[Agent] = [], max_loops: int = 1, + output_type: OutputType = "all", + return_json: bool = False, + shared_memory_system: callable = None, *args, **kwargs, ): - if agents is None or len(agents) == 0: + self.name = name + self.description = description + self.agents = agents + self.max_loops = max_loops + self.output_type = output_type + self.return_json = return_json + self.shared_memory_system = shared_memory_system + + self.reliability_check() + + + + self.agent_rearrange = AgentRearrange( + name=name, + description=description, + agents=agents, + flow=self.sequential_flow(), + max_loops=max_loops, + output_type=output_type, + return_json=return_json, + shared_memory_system=shared_memory_system, + *args, + **kwargs, + ) + + # Handle agent showcase + self.handle_agent_showcase() + + def sequential_flow(self): + # Only create flow if agents exist + if self.agents: + # Create flow by joining agent names with arrows + agent_names = [] + for agent in self.agents: + try: + # Try to get agent_name, fallback to name if not available + agent_name = getattr(agent, 'agent_name', None) or agent.name + agent_names.append(agent_name) + except AttributeError: + logger.warning(f"Could not get name for agent {agent}") + continue + + if agent_names: + flow = " -> ".join(agent_names) + else: + flow = "" + logger.warning("No valid agent names found to create flow") + else: + flow = "" + logger.warning("No agents provided to create flow") + + return flow + + def reliability_check(self): + if self.agents is None or len(self.agents) == 0: raise ValueError("Agents list cannot be None or empty") - if max_loops == 0: + if self.max_loops == 0: raise ValueError("max_loops cannot be 0") + + if self.output_type not in OutputType: + raise ValueError("output_type must be 'all', 'final', 'list', 'dict', '.json', '.md', '.txt', '.yaml', or '.toml'") + + logger.info("Checks completed your swarm is ready.") + + def handle_agent_showcase(self): + # Get the showcase string once instead of regenerating for each agent + showcase_str = showcase_available_agents( + name=self.name, + description=self.description, + agents=self.agents, + ) + + # Append showcase string to each agent's existing system prompt + for agent in self.agents: + agent.system_prompt += showcase_str + + def run( + self, + task: str, + device: str = "cpu", + all_cpus: bool = False, + auto_gpu: bool = False, + *args, + **kwargs, + ) -> str: + """ + Executes a task through the agents in the dynamically constructed flow. + + Args: + task (str): The task for the agents to execute. + + Returns: + str: The final result after processing through all agents. + + Raises: + ValueError: If task is None or empty + Exception: If any error occurs during task execution + """ try: - super().__init__( - name=name, - description=description, - agents=agents, + logger.info( + f"Executing task with dynamic flow: {self.flow}" + ) + return self.agent_rearrange.run( + task, + device=device, + all_cpus=all_cpus, + auto_gpu=auto_gpu, *args, **kwargs, ) - self.name = name - self.description = description - self.agents = agents - self.flow = " -> ".join( - agent.agent_name for agent in agents + except Exception as e: + logger.error( + f"An error occurred while executing the task: {e}" ) - self.agent_rearrange = AgentRearrange( - name=name, - description=description, - agents=agents, - flow=self.flow, - max_loops=max_loops, - *args, - **kwargs, + raise e + + def __call__(self, task: str, *args, **kwargs) -> str: + return self.run(task, *args, **kwargs) + + def run_batched(self, tasks: List[str]) -> List[str]: + """ + Executes a batch of tasks through the agents in the dynamically constructed flow. + + Args: + tasks (List[str]): The tasks for the agents to execute. + + Returns: + List[str]: The final results after processing through all agents. + + Raises: + ValueError: If tasks is None or empty + Exception: If any error occurs during task execution + """ + if not tasks or not all( + isinstance(task, str) for task in tasks + ): + raise ValueError( + "Tasks must be a non-empty list of strings" + ) + + try: + logger.info( + f"Executing batch of tasks with dynamic flow: {self.flow}" ) + return [self.agent_rearrange.run(task) for task in tasks] except Exception as e: logger.error( - f"Error initializing SequentialWorkflow: {str(e)}" + f"An error occurred while executing the batch of tasks: {e}" ) raise - def run(self, task: str) -> str: + async def run_async(self, task: str) -> str: """ - Runs the task through the agents in the dynamically constructed flow. + Executes the task through the agents in the dynamically constructed flow asynchronously. Args: task (str): The task for the agents to execute. @@ -82,11 +206,51 @@ class SequentialWorkflow(BaseSwarm): try: logger.info( - f"Running task with dynamic flow: {self.flow}" + f"Executing task with dynamic flow asynchronously: {self.flow}" + ) + return await self.agent_rearrange.run_async(task) + except Exception as e: + logger.error( + f"An error occurred while executing the task asynchronously: {e}" + ) + raise + + async def run_concurrent(self, tasks: List[str]) -> List[str]: + """ + Executes a batch of tasks through the agents in the dynamically constructed flow concurrently. + + Args: + tasks (List[str]): The tasks for the agents to execute. + + Returns: + List[str]: The final results after processing through all agents. + + Raises: + ValueError: If tasks is None or empty + Exception: If any error occurs during task execution + """ + if not tasks or not all( + isinstance(task, str) for task in tasks + ): + raise ValueError( + "Tasks must be a non-empty list of strings" + ) + + try: + logger.info( + f"Executing batch of tasks with dynamic flow concurrently: {self.flow}" ) - return self.agent_rearrange.run(task) + with ThreadPoolExecutor() as executor: + results = [ + executor.submit(self.agent_rearrange.run, task) + for task in tasks + ] + return [ + result.result() + for result in as_completed(results) + ] except Exception as e: logger.error( - f"An error occurred while running the task: {e}" + f"An error occurred while executing the batch of tasks concurrently: {e}" ) raise diff --git a/swarms/structs/swarm_arange.py b/swarms/structs/swarm_arange.py index b008c5ec..4e57facd 100644 --- a/swarms/structs/swarm_arange.py +++ b/swarms/structs/swarm_arange.py @@ -4,6 +4,7 @@ import uuid from typing import Any, Callable, Dict, List, Optional from swarms.utils.loguru_logger import logger +from swarms.utils.any_to_str import any_to_str def swarm_id(): @@ -29,23 +30,27 @@ class SwarmRearrange: A class representing a swarm of swarms for rearranging tasks. Attributes: - swarms (dict): A dictionary of swarms, where the key is the swarm's name and the value is the swarm object. - flow (str): The flow pattern of the tasks. - max_loops (int): The maximum number of loops to run the swarm. - verbose (bool): A flag indicating whether to log verbose messages. - human_in_the_loop (bool): A flag indicating whether human intervention is required. - custom_human_in_the_loop (Callable[[str], str], optional): A custom function for human-in-the-loop intervention. - return_json (bool): A flag indicating whether to return the result in JSON format. - swarm_history (dict): A dictionary to keep track of the history of each swarm. - lock (threading.Lock): A lock for thread-safe operations. + id (str): Unique identifier for the swarm arrangement + name (str): Name of the swarm arrangement + description (str): Description of what this swarm arrangement does + swarms (dict): A dictionary of swarms, where the key is the swarm's name and the value is the swarm object + flow (str): The flow pattern of the tasks + max_loops (int): The maximum number of loops to run the swarm + verbose (bool): A flag indicating whether to log verbose messages + human_in_the_loop (bool): A flag indicating whether human intervention is required + custom_human_in_the_loop (Callable[[str], str], optional): A custom function for human-in-the-loop intervention + return_json (bool): A flag indicating whether to return the result in JSON format + swarm_history (dict): A dictionary to keep track of the history of each swarm + lock (threading.Lock): A lock for thread-safe operations Methods: - __init__(swarms: List[swarm] = None, flow: str = None): Initializes the SwarmRearrange object. - add_swarm(swarm: swarm): Adds an swarm to the swarm. - remove_swarm(swarm_name: str): Removes an swarm from the swarm. - add_swarms(swarms: List[swarm]): Adds multiple swarms to the swarm. - validate_flow(): Validates the flow pattern. - run(task): Runs the swarm to rearrange the tasks. + __init__(id: str, name: str, description: str, swarms: List[swarm], flow: str, max_loops: int, verbose: bool, + human_in_the_loop: bool, custom_human_in_the_loop: Callable, return_json: bool): Initializes the SwarmRearrange object + add_swarm(swarm: swarm): Adds an swarm to the swarm + remove_swarm(swarm_name: str): Removes an swarm from the swarm + add_swarms(swarms: List[swarm]): Adds multiple swarms to the swarm + validate_flow(): Validates the flow pattern + run(task): Runs the swarm to rearrange the tasks """ def __init__( @@ -69,8 +74,16 @@ class SwarmRearrange: Initializes the SwarmRearrange object. Args: - swarms (List[swarm], optional): A list of swarm objects. Defaults to None. - flow (str, optional): The flow pattern of the tasks. Defaults to None. + id (str): Unique identifier for the swarm arrangement. Defaults to generated UUID. + name (str): Name of the swarm arrangement. Defaults to "SwarmRearrange". + description (str): Description of what this swarm arrangement does. + swarms (List[swarm]): A list of swarm objects. Defaults to empty list. + flow (str): The flow pattern of the tasks. Defaults to None. + max_loops (int): Maximum number of loops to run. Defaults to 1. + verbose (bool): Whether to log verbose messages. Defaults to True. + human_in_the_loop (bool): Whether human intervention is required. Defaults to False. + custom_human_in_the_loop (Callable): Custom function for human intervention. Defaults to None. + return_json (bool): Whether to return results as JSON. Defaults to False. """ self.id = id self.name = name @@ -271,6 +284,7 @@ class SwarmRearrange: result = swarm.run( current_task, img, *args, **kwargs ) + result = any_to_str(result) logger.info( f"Swarm {swarm_name} returned result of type: {type(result)}" ) @@ -312,6 +326,7 @@ class SwarmRearrange: result = swarm.run( current_task, img, *args, **kwargs ) + result = any_to_str(result) logger.info( f"Swarm {swarm_name} returned result of type: {type(result)}" ) @@ -371,6 +386,7 @@ def swarm_arrange( flow, ) result = swarm_arrangement.run(task, *args, **kwargs) + result = any_to_str(result) logger.info( f"Swarm arrangement {name} executed successfully with output type {output_type}." ) diff --git a/swarms/structs/swarm_matcher.py b/swarms/structs/swarm_matcher.py index ebbfa0a3..37d75eac 100644 --- a/swarms/structs/swarm_matcher.py +++ b/swarms/structs/swarm_matcher.py @@ -1,338 +1,104 @@ -# from typing import List, Tuple, Optional -# import numpy as np -# import torch -# from transformers import AutoTokenizer, AutoModel -# from pydantic import BaseModel, Field -# from loguru import logger -# import json -# from tenacity import retry, stop_after_attempt, wait_exponential -# from uuid import uuid4 - - -# class SwarmType(BaseModel): -# name: str -# description: str -# embedding: Optional[List[float]] = Field( -# default=None, exclude=True -# ) - - -# class SwarmMatcherConfig(BaseModel): -# model_name: str = "sentence-transformers/all-MiniLM-L6-v2" -# embedding_dim: int = ( -# 512 # Dimension of the sentence-transformers model -# ) - - -# class SwarmMatcher: -# """ -# A class for matching tasks to swarm types based on their descriptions. -# It utilizes a transformer model to generate embeddings for task and swarm type descriptions, -# and then calculates the dot product to find the best match. -# """ - -# def __init__(self, config: SwarmMatcherConfig): -# """ -# Initializes the SwarmMatcher with a configuration. - -# Args: -# config (SwarmMatcherConfig): The configuration for the SwarmMatcher. -# """ -# logger.add("swarm_matcher_debug.log", level="DEBUG") -# logger.debug("Initializing SwarmMatcher") -# try: -# self.config = config -# self.tokenizer = AutoTokenizer.from_pretrained( -# config.model_name -# ) -# self.model = AutoModel.from_pretrained(config.model_name) -# self.swarm_types: List[SwarmType] = [] -# logger.debug("SwarmMatcher initialized successfully") -# except Exception as e: -# logger.error(f"Error initializing SwarmMatcher: {str(e)}") -# raise - -# @retry( -# stop=stop_after_attempt(3), -# wait=wait_exponential(multiplier=1, min=4, max=10), -# ) -# def get_embedding(self, text: str) -> np.ndarray: -# """ -# Generates an embedding for a given text using the configured model. - -# Args: -# text (str): The text for which to generate an embedding. - -# Returns: -# np.ndarray: The embedding vector for the text. -# """ -# logger.debug(f"Getting embedding for text: {text[:50]}...") -# try: -# inputs = self.tokenizer( -# text, -# return_tensors="pt", -# padding=True, -# truncation=True, -# max_length=512, -# ) -# with torch.no_grad(): -# outputs = self.model(**inputs) -# embedding = ( -# outputs.last_hidden_state.mean(dim=1) -# .squeeze() -# .numpy() -# ) -# logger.debug("Embedding generated successfully") -# return embedding -# except Exception as e: -# logger.error(f"Error generating embedding: {str(e)}") -# raise - -# def add_swarm_type(self, swarm_type: SwarmType): -# """ -# Adds a swarm type to the list of swarm types, generating an embedding for its description. - -# Args: -# swarm_type (SwarmType): The swarm type to add. -# """ -# logger.debug(f"Adding swarm type: {swarm_type.name}") -# try: -# embedding = self.get_embedding(swarm_type.description) -# swarm_type.embedding = embedding.tolist() -# self.swarm_types.append(swarm_type) -# logger.info(f"Added swarm type: {swarm_type.name}") -# except Exception as e: -# logger.error( -# f"Error adding swarm type {swarm_type.name}: {str(e)}" -# ) -# raise - -# def find_best_match(self, task: str) -> Tuple[str, float]: -# """ -# Finds the best match for a given task among the registered swarm types. - -# Args: -# task (str): The task for which to find the best match. - -# Returns: -# Tuple[str, float]: A tuple containing the name of the best matching swarm type and the score. -# """ -# logger.debug(f"Finding best match for task: {task[:50]}...") -# try: -# task_embedding = self.get_embedding(task) -# best_match = None -# best_score = -float("inf") -# for swarm_type in self.swarm_types: -# score = np.dot( -# task_embedding, np.array(swarm_type.embedding) -# ) -# if score > best_score: -# best_score = score -# best_match = swarm_type -# logger.info( -# f"Best match for task: {best_match.name} (score: {best_score})" -# ) -# return best_match.name, float(best_score) -# except Exception as e: -# logger.error( -# f"Error finding best match for task: {str(e)}" -# ) -# raise - -# def auto_select_swarm(self, task: str) -> str: -# """ -# Automatically selects the best swarm type for a given task based on their descriptions. - -# Args: -# task (str): The task for which to select a swarm type. - -# Returns: -# str: The name of the selected swarm type. -# """ -# logger.debug(f"Auto-selecting swarm for task: {task[:50]}...") -# best_match, score = self.find_best_match(task) -# logger.info(f"Task: {task}") -# logger.info(f"Selected Swarm Type: {best_match}") -# logger.info(f"Confidence Score: {score:.2f}") -# return best_match - -# def run_multiple(self, tasks: List[str], *args, **kwargs) -> str: -# swarms = [] - -# for task in tasks: -# output = self.auto_select_swarm(task) - -# # Append -# swarms.append(output) - -# return swarms - -# def save_swarm_types(self, filename: str): -# """ -# Saves the registered swarm types to a JSON file. - -# Args: -# filename (str): The name of the file to which to save the swarm types. -# """ -# try: -# with open(filename, "w") as f: -# json.dump([st.dict() for st in self.swarm_types], f) -# logger.info(f"Saved swarm types to {filename}") -# except Exception as e: -# logger.error(f"Error saving swarm types: {str(e)}") -# raise - -# def load_swarm_types(self, filename: str): -# """ -# Loads swarm types from a JSON file. - -# Args: -# filename (str): The name of the file from which to load the swarm types. -# """ -# try: -# with open(filename, "r") as f: -# swarm_types_data = json.load(f) -# self.swarm_types = [ -# SwarmType(**st) for st in swarm_types_data -# ] -# logger.info(f"Loaded swarm types from {filename}") -# except Exception as e: -# logger.error(f"Error loading swarm types: {str(e)}") -# raise - - -# def initialize_swarm_types(matcher: SwarmMatcher): -# logger.debug("Initializing swarm types") -# swarm_types = [ -# SwarmType( -# name="AgentRearrange", -# description="Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation and minimizing bottlenecks. Keywords: orchestration, coordination, pipeline optimization, task scheduling, resource allocation, workflow management, agent organization, process optimization", -# ), -# SwarmType( -# name="MixtureOfAgents", -# description="Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach to problem-solving and leveraging individual strengths. Keywords: multi-agent system, expert collaboration, distributed intelligence, collective problem solving, agent specialization, team coordination, hybrid approaches, knowledge synthesis", -# ), -# SwarmType( -# name="SpreadSheetSwarm", -# description="Collaborative data processing and analysis in a spreadsheet-like environment, facilitating real-time data sharing and visualization. Keywords: data analysis, tabular processing, collaborative editing, data transformation, spreadsheet operations, data visualization, real-time collaboration, structured data", -# ), -# SwarmType( -# name="SequentialWorkflow", -# description="Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical approach to task execution. Keywords: linear processing, waterfall methodology, step-by-step execution, ordered tasks, sequential operations, process flow, systematic approach, staged execution", -# ), -# SwarmType( -# name="ConcurrentWorkflow", -# description="Process multiple tasks or data sources concurrently in parallel, maximizing productivity and reducing processing time. Keywords: parallel processing, multi-threading, asynchronous execution, distributed computing, concurrent operations, simultaneous tasks, parallel workflows, scalable processing", -# ), -# # SwarmType( -# # name="HierarchicalSwarm", -# # description="Organize agents in a hierarchical structure with clear reporting lines and delegation of responsibilities. Keywords: management hierarchy, organizational structure, delegation, supervision, chain of command, tiered organization, structured coordination", -# # ), -# # SwarmType( -# # name="AdaptiveSwarm", -# # description="Dynamically adjust agent behavior and swarm configuration based on task requirements and performance feedback. Keywords: dynamic adaptation, self-optimization, feedback loops, learning systems, flexible configuration, responsive behavior, adaptive algorithms", -# # ), -# # SwarmType( -# # name="ConsensusSwarm", -# # description="Achieve group decisions through consensus mechanisms and voting protocols among multiple agents. Keywords: group decision making, voting systems, collective intelligence, agreement protocols, democratic processes, collaborative decisions", -# # ), -# ] - -# for swarm_type in swarm_types: -# matcher.add_swarm_type(swarm_type) -# logger.debug("Swarm types initialized") - - -# def swarm_matcher(task: str, *args, **kwargs): -# """ -# Runs the SwarmMatcher example with predefined tasks and swarm types. -# """ -# config = SwarmMatcherConfig() -# matcher = SwarmMatcher(config) -# initialize_swarm_types(matcher) - -# # matcher.save_swarm_types(f"swarm_logs/{uuid4().hex}.json") - -# swarm_type = matcher.auto_select_swarm(task) - -# logger.info(f"{swarm_type}") - -# return swarm_type - - -from typing import List, Tuple, Dict +from typing import List, Tuple, Optional +import numpy as np +import torch +from transformers import AutoTokenizer, AutoModel from pydantic import BaseModel, Field from loguru import logger -from uuid import uuid4 -import chromadb import json from tenacity import retry, stop_after_attempt, wait_exponential class SwarmType(BaseModel): - """A swarm type with its name, description and optional metadata""" - - id: str = Field(default_factory=lambda: str(uuid4())) name: str description: str - metadata: Dict = Field(default_factory=dict) + embedding: Optional[List[float]] = Field( + default=None, exclude=True + ) class SwarmMatcherConfig(BaseModel): - """Configuration for the SwarmMatcher""" - - collection_name: str = "swarm_types" - distance_metric: str = "cosine" # or "l2" or "ip" - embedding_function: str = ( - "sentence-transformers/all-mpnet-base-v2" # Better model than MiniLM + model_name: str = "sentence-transformers/all-MiniLM-L6-v2" + embedding_dim: int = ( + 512 # Dimension of the sentence-transformers model ) - persist_directory: str = "./chroma_db" class SwarmMatcher: """ - An improved swarm matcher that uses ChromaDB for better vector similarity search. - Features: - - Persistent storage of embeddings - - Better vector similarity search with multiple distance metrics - - Improved embedding model - - Metadata filtering capabilities - - Batch operations support + A class for matching tasks to swarm types based on their descriptions. + It utilizes a transformer model to generate embeddings for task and swarm type descriptions, + and then calculates the dot product to find the best match. """ def __init__(self, config: SwarmMatcherConfig): - """Initialize the improved swarm matcher""" - logger.add("swarm_matcher.log", rotation="100 MB") - self.config = config + """ + Initializes the SwarmMatcher with a configuration. - # Initialize ChromaDB client with persistence - self.chroma_client = chromadb.Client() + Args: + config (SwarmMatcherConfig): The configuration for the SwarmMatcher. + """ + logger.add("swarm_matcher_debug.log", level="DEBUG") + logger.debug("Initializing SwarmMatcher") + try: + self.config = config + self.tokenizer = AutoTokenizer.from_pretrained( + config.model_name + ) + self.model = AutoModel.from_pretrained(config.model_name) + self.swarm_types: List[SwarmType] = [] + logger.debug("SwarmMatcher initialized successfully") + except Exception as e: + logger.error(f"Error initializing SwarmMatcher: {str(e)}") + raise - # Get or create collection + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + ) + def get_embedding(self, text: str) -> np.ndarray: + """ + Generates an embedding for a given text using the configured model. + + Args: + text (str): The text for which to generate an embedding. + + Returns: + np.ndarray: The embedding vector for the text. + """ + logger.debug(f"Getting embedding for text: {text[:50]}...") try: - self.collection = self.chroma_client.get_collection( - name=config.collection_name, + inputs = self.tokenizer( + text, + return_tensors="pt", + padding=True, + truncation=True, + max_length=512, ) - except ValueError: - self.collection = self.chroma_client.create_collection( - name=config.collection_name, - metadata={"hnsw:space": config.distance_metric}, + with torch.no_grad(): + outputs = self.model(**inputs) + embedding = ( + outputs.last_hidden_state.mean(dim=1) + .squeeze() + .numpy() ) + logger.debug("Embedding generated successfully") + return embedding + except Exception as e: + logger.error(f"Error generating embedding: {str(e)}") + raise - logger.info( - f"Initialized SwarmMatcher with collection '{config.collection_name}'" - ) + def add_swarm_type(self, swarm_type: SwarmType): + """ + Adds a swarm type to the list of swarm types, generating an embedding for its description. - def add_swarm_type(self, swarm_type: SwarmType) -> None: - """Add a single swarm type to the collection""" + Args: + swarm_type (SwarmType): The swarm type to add. + """ + logger.debug(f"Adding swarm type: {swarm_type.name}") try: - self.collection.add( - ids=[swarm_type.id], - documents=[swarm_type.description], - metadatas=[ - {"name": swarm_type.name, **swarm_type.metadata} - ], - ) + embedding = self.get_embedding(swarm_type.description) + swarm_type.embedding = embedding.tolist() + self.swarm_types.append(swarm_type) logger.info(f"Added swarm type: {swarm_type.name}") except Exception as e: logger.error( @@ -340,239 +106,472 @@ class SwarmMatcher: ) raise - def add_swarm_types(self, swarm_types: List[SwarmType]) -> None: - """Add multiple swarm types in batch""" + def find_best_match(self, task: str) -> Tuple[str, float]: + """ + Finds the best match for a given task among the registered swarm types. + + Args: + task (str): The task for which to find the best match. + + Returns: + Tuple[str, float]: A tuple containing the name of the best matching swarm type and the score. + """ + logger.debug(f"Finding best match for task: {task[:50]}...") try: - self.collection.add( - ids=[st.id for st in swarm_types], - documents=[st.description for st in swarm_types], - metadatas=[ - {"name": st.name, **st.metadata} - for st in swarm_types - ], + task_embedding = self.get_embedding(task) + best_match = None + best_score = -float("inf") + for swarm_type in self.swarm_types: + score = np.dot( + task_embedding, np.array(swarm_type.embedding) + ) + if score > best_score: + best_score = score + best_match = swarm_type + logger.info( + f"Best match for task: {best_match.name} (score: {best_score})" ) - logger.info(f"Added {len(swarm_types)} swarm types") + return best_match.name, float(best_score) except Exception as e: logger.error( - f"Error adding swarm types in batch: {str(e)}" + f"Error finding best match for task: {str(e)}" ) raise - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), - ) - def find_best_matches( - self, - task: str, - n_results: int = 3, - score_threshold: float = 0.7, - ) -> List[Tuple[str, float]]: + def auto_select_swarm(self, task: str) -> str: """ - Find the best matching swarm types for a given task - Returns multiple matches with their scores + Automatically selects the best swarm type for a given task based on their descriptions. + + Args: + task (str): The task for which to select a swarm type. + + Returns: + str: The name of the selected swarm type. """ - try: - results = self.collection.query( - query_texts=[task], - n_results=n_results, - include=["metadatas", "distances"], - ) + logger.debug(f"Auto-selecting swarm for task: {task[:50]}...") + best_match, score = self.find_best_match(task) + logger.info(f"Task: {task}") + logger.info(f"Selected Swarm Type: {best_match}") + logger.info(f"Confidence Score: {score:.2f}") + return best_match - matches = [] - for metadata, distance in zip( - results["metadatas"][0], results["distances"][0] - ): - # Convert distance to similarity score (1 - normalized_distance) - score = 1 - ( - distance / 2 - ) # Normalize cosine distance to [0,1] - if score >= score_threshold: - matches.append((metadata["name"], score)) + def run_multiple(self, tasks: List[str], *args, **kwargs) -> str: + swarms = [] - logger.info(f"Found {len(matches)} matches for task") - return matches + for task in tasks: + output = self.auto_select_swarm(task) - except Exception as e: - logger.error(f"Error finding matches for task: {str(e)}") - raise + # Append + swarms.append(output) - def auto_select_swarm(self, task: str) -> str: - """ - Automatically select the best swarm type for a task - Returns only the top match - """ - matches = self.find_best_matches(task, n_results=1) - if not matches: - logger.warning("No suitable matches found for task") - return "SequentialWorkflow" # Default fallback - - best_match, score = matches[0] - logger.info( - f"Selected swarm type '{best_match}' with confidence {score:.3f}" - ) - return best_match + return swarms - def run_multiple(self, tasks: List[str]) -> List[str]: - """Process multiple tasks in batch""" - return [self.auto_select_swarm(task) for task in tasks] + def save_swarm_types(self, filename: str): + """ + Saves the registered swarm types to a JSON file. - def save_swarm_types(self, filename: str) -> None: - """Export swarm types to JSON""" + Args: + filename (str): The name of the file to which to save the swarm types. + """ try: - all_data = self.collection.get( - include=["metadatas", "documents"] - ) - swarm_types = [ - SwarmType( - id=id_, - name=metadata["name"], - description=document, - metadata={ - k: v - for k, v in metadata.items() - if k != "name" - }, - ) - for id_, metadata, document in zip( - all_data["ids"], - all_data["metadatas"], - all_data["documents"], - ) - ] - with open(filename, "w") as f: - json.dump( - [st.dict() for st in swarm_types], f, indent=2 - ) + json.dump([st.dict() for st in self.swarm_types], f) logger.info(f"Saved swarm types to {filename}") except Exception as e: logger.error(f"Error saving swarm types: {str(e)}") raise - def load_swarm_types(self, filename: str) -> None: - """Import swarm types from JSON""" + def load_swarm_types(self, filename: str): + """ + Loads swarm types from a JSON file. + + Args: + filename (str): The name of the file from which to load the swarm types. + """ try: with open(filename, "r") as f: swarm_types_data = json.load(f) - swarm_types = [SwarmType(**st) for st in swarm_types_data] - self.add_swarm_types(swarm_types) + self.swarm_types = [ + SwarmType(**st) for st in swarm_types_data + ] logger.info(f"Loaded swarm types from {filename}") except Exception as e: logger.error(f"Error loading swarm types: {str(e)}") raise -def initialize_default_swarm_types(matcher: SwarmMatcher) -> None: - """Initialize the matcher with default swarm types""" +def initialize_swarm_types(matcher: SwarmMatcher): + logger.debug("Initializing swarm types") swarm_types = [ SwarmType( name="AgentRearrange", - description=""" - Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation - and minimizing bottlenecks. Specialized in orchestration, coordination, pipeline optimization, - task scheduling, resource allocation, workflow management, agent organization, and process optimization. - Best for tasks requiring complex agent interactions and workflow optimization. - """, - metadata={ - "category": "optimization", - "complexity": "high", - }, + description="Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation and minimizing bottlenecks. Keywords: orchestration, coordination, pipeline optimization, task scheduling, resource allocation, workflow management, agent organization, process optimization", ), SwarmType( name="MixtureOfAgents", - description=""" - Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach - to problem-solving and leveraging individual strengths. Focuses on multi-agent systems, - expert collaboration, distributed intelligence, collective problem solving, agent specialization, - team coordination, hybrid approaches, and knowledge synthesis. Ideal for complex problems - requiring multiple areas of expertise. - """, - metadata={ - "category": "collaboration", - "complexity": "high", - }, + description="Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach to problem-solving and leveraging individual strengths. Keywords: multi-agent system, expert collaboration, distributed intelligence, collective problem solving, agent specialization, team coordination, hybrid approaches, knowledge synthesis", ), SwarmType( name="SpreadSheetSwarm", - description=""" - Collaborative data processing and analysis in a spreadsheet-like environment, facilitating - real-time data sharing and visualization. Specializes in data analysis, tabular processing, - collaborative editing, data transformation, spreadsheet operations, data visualization, - real-time collaboration, and structured data handling. Perfect for data-intensive tasks - requiring structured analysis. - """, - metadata={ - "category": "data_processing", - "complexity": "medium", - }, + description="Collaborative data processing and analysis in a spreadsheet-like environment, facilitating real-time data sharing and visualization. Keywords: data analysis, tabular processing, collaborative editing, data transformation, spreadsheet operations, data visualization, real-time collaboration, structured data", ), SwarmType( name="SequentialWorkflow", - description=""" - Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical - approach to task execution. Focuses on linear processing, waterfall methodology, step-by-step - execution, ordered tasks, sequential operations, process flow, systematic approach, and staged - execution. Best for tasks requiring strict order and dependencies. - """, - metadata={"category": "workflow", "complexity": "low"}, + description="Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical approach to task execution. Keywords: linear processing, waterfall methodology, step-by-step execution, ordered tasks, sequential operations, process flow, systematic approach, staged execution", ), SwarmType( name="ConcurrentWorkflow", - description=""" - Process multiple tasks or data sources concurrently in parallel, maximizing productivity - and reducing processing time. Specializes in parallel processing, multi-threading, - asynchronous execution, distributed computing, concurrent operations, simultaneous tasks, - parallel workflows, and scalable processing. Ideal for independent tasks that can be - processed simultaneously. - """, - metadata={"category": "workflow", "complexity": "medium"}, + description="Process multiple tasks or data sources concurrently in parallel, maximizing productivity and reducing processing time. Keywords: parallel processing, multi-threading, asynchronous execution, distributed computing, concurrent operations, simultaneous tasks, parallel workflows, scalable processing", ), + # SwarmType( + # name="HierarchicalSwarm", + # description="Organize agents in a hierarchical structure with clear reporting lines and delegation of responsibilities. Keywords: management hierarchy, organizational structure, delegation, supervision, chain of command, tiered organization, structured coordination", + # ), + # SwarmType( + # name="AdaptiveSwarm", + # description="Dynamically adjust agent behavior and swarm configuration based on task requirements and performance feedback. Keywords: dynamic adaptation, self-optimization, feedback loops, learning systems, flexible configuration, responsive behavior, adaptive algorithms", + # ), + # SwarmType( + # name="ConsensusSwarm", + # description="Achieve group decisions through consensus mechanisms and voting protocols among multiple agents. Keywords: group decision making, voting systems, collective intelligence, agreement protocols, democratic processes, collaborative decisions", + # ), ] - matcher.add_swarm_types(swarm_types) - logger.info("Initialized default swarm types") + for swarm_type in swarm_types: + matcher.add_swarm_type(swarm_type) + logger.debug("Swarm types initialized") -def create_swarm_matcher( - persist_dir: str = "./chroma_db", - collection_name: str = "swarm_types", -) -> SwarmMatcher: - """Convenience function to create and initialize a swarm matcher""" - config = SwarmMatcherConfig( - persist_directory=persist_dir, collection_name=collection_name - ) +def swarm_matcher(task: str, *args, **kwargs): + """ + Runs the SwarmMatcher example with predefined tasks and swarm types. + """ + config = SwarmMatcherConfig() matcher = SwarmMatcher(config) - initialize_default_swarm_types(matcher) - return matcher + initialize_swarm_types(matcher) - -# Example usage -def swarm_matcher(task: str) -> str: - # Create and initialize matcher - matcher = create_swarm_matcher() + # matcher.save_swarm_types(f"swarm_logs/{uuid4().hex}.json") swarm_type = matcher.auto_select_swarm(task) - print(f"Task: {task}\nSelected Swarm: {swarm_type}\n") + + logger.info(f"{swarm_type}") return swarm_type +# from typing import List, Tuple, Dict +# from pydantic import BaseModel, Field +# from loguru import logger +# from uuid import uuid4 +# import chromadb +# import json +# from tenacity import retry, stop_after_attempt, wait_exponential + + +# class SwarmType(BaseModel): +# """A swarm type with its name, description and optional metadata""" + +# id: str = Field(default_factory=lambda: str(uuid4())) +# name: str +# description: str +# metadata: Dict = Field(default_factory=dict) + + +# class SwarmMatcherConfig(BaseModel): +# """Configuration for the SwarmMatcher""" + +# collection_name: str = "swarm_types" +# distance_metric: str = "cosine" # or "l2" or "ip" +# embedding_function: str = ( +# "sentence-transformers/all-mpnet-base-v2" # Better model than MiniLM +# ) +# persist_directory: str = "./chroma_db" + + +# class SwarmMatcher: +# """ +# An improved swarm matcher that uses ChromaDB for better vector similarity search. +# Features: +# - Persistent storage of embeddings +# - Better vector similarity search with multiple distance metrics +# - Improved embedding model +# - Metadata filtering capabilities +# - Batch operations support +# """ + +# def __init__(self, config: SwarmMatcherConfig): +# """Initialize the improved swarm matcher""" +# logger.add("swarm_matcher.log", rotation="100 MB") +# self.config = config + +# # Initialize ChromaDB client with persistence +# self.chroma_client = chromadb.Client() + +# # Get or create collection +# try: +# self.collection = self.chroma_client.get_collection( +# name=config.collection_name, +# ) +# except ValueError: +# self.collection = self.chroma_client.create_collection( +# name=config.collection_name, +# metadata={"hnsw:space": config.distance_metric}, +# ) + +# logger.info( +# f"Initialized SwarmMatcher with collection '{config.collection_name}'" +# ) + +# def add_swarm_type(self, swarm_type: SwarmType) -> None: +# """Add a single swarm type to the collection""" +# try: +# self.collection.add( +# ids=[swarm_type.id], +# documents=[swarm_type.description], +# metadatas=[ +# {"name": swarm_type.name, **swarm_type.metadata} +# ], +# ) +# logger.info(f"Added swarm type: {swarm_type.name}") +# except Exception as e: +# logger.error( +# f"Error adding swarm type {swarm_type.name}: {str(e)}" +# ) +# raise + +# def add_swarm_types(self, swarm_types: List[SwarmType]) -> None: +# """Add multiple swarm types in batch""" +# try: +# self.collection.add( +# ids=[st.id for st in swarm_types], +# documents=[st.description for st in swarm_types], +# metadatas=[ +# {"name": st.name, **st.metadata} +# for st in swarm_types +# ], +# ) +# logger.info(f"Added {len(swarm_types)} swarm types") +# except Exception as e: +# logger.error( +# f"Error adding swarm types in batch: {str(e)}" +# ) +# raise + +# @retry( +# stop=stop_after_attempt(3), +# wait=wait_exponential(multiplier=1, min=4, max=10), +# ) +# def find_best_matches( +# self, +# task: str, +# n_results: int = 3, +# score_threshold: float = 0.7, +# ) -> List[Tuple[str, float]]: +# """ +# Find the best matching swarm types for a given task +# Returns multiple matches with their scores +# """ +# try: +# results = self.collection.query( +# query_texts=[task], +# n_results=n_results, +# include=["metadatas", "distances"], +# ) + +# matches = [] +# for metadata, distance in zip( +# results["metadatas"][0], results["distances"][0] +# ): +# # Convert distance to similarity score (1 - normalized_distance) +# score = 1 - ( +# distance / 2 +# ) # Normalize cosine distance to [0,1] +# if score >= score_threshold: +# matches.append((metadata["name"], score)) + +# logger.info(f"Found {len(matches)} matches for task") +# return matches + +# except Exception as e: +# logger.error(f"Error finding matches for task: {str(e)}") +# raise + +# def auto_select_swarm(self, task: str) -> str: +# """ +# Automatically select the best swarm type for a task +# Returns only the top match +# """ +# matches = self.find_best_matches(task, n_results=1) +# if not matches: +# logger.warning("No suitable matches found for task") +# return "SequentialWorkflow" # Default fallback + +# best_match, score = matches[0] +# logger.info( +# f"Selected swarm type '{best_match}' with confidence {score:.3f}" +# ) +# return best_match + +# def run_multiple(self, tasks: List[str]) -> List[str]: +# """Process multiple tasks in batch""" +# return [self.auto_select_swarm(task) for task in tasks] + +# def save_swarm_types(self, filename: str) -> None: +# """Export swarm types to JSON""" +# try: +# all_data = self.collection.get( +# include=["metadatas", "documents"] +# ) +# swarm_types = [ +# SwarmType( +# id=id_, +# name=metadata["name"], +# description=document, +# metadata={ +# k: v +# for k, v in metadata.items() +# if k != "name" +# }, +# ) +# for id_, metadata, document in zip( +# all_data["ids"], +# all_data["metadatas"], +# all_data["documents"], +# ) +# ] + +# with open(filename, "w") as f: +# json.dump( +# [st.dict() for st in swarm_types], f, indent=2 +# ) +# logger.info(f"Saved swarm types to {filename}") +# except Exception as e: +# logger.error(f"Error saving swarm types: {str(e)}") +# raise + +# def load_swarm_types(self, filename: str) -> None: +# """Import swarm types from JSON""" +# try: +# with open(filename, "r") as f: +# swarm_types_data = json.load(f) +# swarm_types = [SwarmType(**st) for st in swarm_types_data] +# self.add_swarm_types(swarm_types) +# logger.info(f"Loaded swarm types from {filename}") +# except Exception as e: +# logger.error(f"Error loading swarm types: {str(e)}") +# raise + + +# def initialize_default_swarm_types(matcher: SwarmMatcher) -> None: +# """Initialize the matcher with default swarm types""" +# swarm_types = [ +# SwarmType( +# name="AgentRearrange", +# description=""" +# Optimize agent order and rearrange flow for multi-step tasks, ensuring efficient task allocation +# and minimizing bottlenecks. Specialized in orchestration, coordination, pipeline optimization, +# task scheduling, resource allocation, workflow management, agent organization, and process optimization. +# Best for tasks requiring complex agent interactions and workflow optimization. +# """, +# metadata={ +# "category": "optimization", +# "complexity": "high", +# }, +# ), +# SwarmType( +# name="MixtureOfAgents", +# description=""" +# Combine diverse expert agents for comprehensive analysis, fostering a collaborative approach +# to problem-solving and leveraging individual strengths. Focuses on multi-agent systems, +# expert collaboration, distributed intelligence, collective problem solving, agent specialization, +# team coordination, hybrid approaches, and knowledge synthesis. Ideal for complex problems +# requiring multiple areas of expertise. +# """, +# metadata={ +# "category": "collaboration", +# "complexity": "high", +# }, +# ), +# SwarmType( +# name="SpreadSheetSwarm", +# description=""" +# Collaborative data processing and analysis in a spreadsheet-like environment, facilitating +# real-time data sharing and visualization. Specializes in data analysis, tabular processing, +# collaborative editing, data transformation, spreadsheet operations, data visualization, +# real-time collaboration, and structured data handling. Perfect for data-intensive tasks +# requiring structured analysis. +# """, +# metadata={ +# "category": "data_processing", +# "complexity": "medium", +# }, +# ), +# SwarmType( +# name="SequentialWorkflow", +# description=""" +# Execute tasks in a step-by-step, sequential process workflow, ensuring a logical and methodical +# approach to task execution. Focuses on linear processing, waterfall methodology, step-by-step +# execution, ordered tasks, sequential operations, process flow, systematic approach, and staged +# execution. Best for tasks requiring strict order and dependencies. +# """, +# metadata={"category": "workflow", "complexity": "low"}, +# ), +# SwarmType( +# name="ConcurrentWorkflow", +# description=""" +# Process multiple tasks or data sources concurrently in parallel, maximizing productivity +# and reducing processing time. Specializes in parallel processing, multi-threading, +# asynchronous execution, distributed computing, concurrent operations, simultaneous tasks, +# parallel workflows, and scalable processing. Ideal for independent tasks that can be +# processed simultaneously. +# """, +# metadata={"category": "workflow", "complexity": "medium"}, +# ), +# ] + +# matcher.add_swarm_types(swarm_types) +# logger.info("Initialized default swarm types") + + +# def create_swarm_matcher( +# persist_dir: str = "./chroma_db", +# collection_name: str = "swarm_types", +# ) -> SwarmMatcher: +# """Convenience function to create and initialize a swarm matcher""" +# config = SwarmMatcherConfig( +# persist_directory=persist_dir, collection_name=collection_name +# ) +# matcher = SwarmMatcher(config) +# initialize_default_swarm_types(matcher) +# return matcher + + # # Example usage -# if __name__ == "__main__": +# def swarm_matcher(task: str) -> str: # # Create and initialize matcher # matcher = create_swarm_matcher() -# # Example tasks -# tasks = [ -# "Analyze this spreadsheet of sales data and create visualizations", -# "Coordinate multiple AI agents to solve a complex problem", -# "Process these tasks one after another in a specific order", -# "Write multiple blog posts about the latest advancements in swarm intelligence all at once", -# "Write a blog post about the latest advancements in swarm intelligence", -# ] +# swarm_type = matcher.auto_select_swarm(task) +# print(f"Task: {task}\nSelected Swarm: {swarm_type}\n") + +# return swarm_type + -# # Process tasks -# for task in tasks: -# swarm_type = matcher.auto_select_swarm(task) -# print(f"Task: {task}\nSelected Swarm: {swarm_type}\n") +# # # Example usage +# # if __name__ == "__main__": +# # # Create and initialize matcher +# # matcher = create_swarm_matcher() + +# # # Example tasks +# # tasks = [ +# # "Analyze this spreadsheet of sales data and create visualizations", +# # "Coordinate multiple AI agents to solve a complex problem", +# # "Process these tasks one after another in a specific order", +# # "Write multiple blog posts about the latest advancements in swarm intelligence all at once", +# # "Write a blog post about the latest advancements in swarm intelligence", +# # ] + +# # # Process tasks +# # for task in tasks: +# # swarm_type = matcher.auto_select_swarm(task) +# # print(f"Task: {task}\nSelected Swarm: {swarm_type}\n") diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index 7d1179d9..e14ff9e7 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -2,18 +2,22 @@ import uuid from datetime import datetime from typing import Any, Callable, Dict, List, Literal, Union +from doc_master import doc_master from loguru import logger from pydantic import BaseModel, Field +from tenacity import retry, stop_after_attempt, wait_fixed +from swarms.prompts.ag_prompt import aggregator_system_prompt from swarms.structs.agent import Agent from swarms.structs.concurrent_workflow import ConcurrentWorkflow from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.rearrange import AgentRearrange from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm -from tenacity import retry, stop_after_attempt, wait_fixed from swarms.structs.swarm_matcher import swarm_matcher -from swarms.prompts.ag_prompt import aggregator_system_prompt +from swarms.utils.wrapper_clusterop import ( + exec_callable_with_clusterops, +) SwarmType = Literal[ "AgentRearrange", @@ -25,6 +29,11 @@ SwarmType = Literal[ ] +class Document(BaseModel): + file_path: str + data: str + + class SwarmLog(BaseModel): """ A Pydantic model to capture log entries. @@ -37,36 +46,78 @@ class SwarmLog(BaseModel): swarm_type: SwarmType task: str = "" metadata: Dict[str, Any] = Field(default_factory=dict) + documents: List[Document] = [] class SwarmRouter: """ - A class to dynamically route tasks to different swarm types based on user selection or automatic matching. + A class that dynamically routes tasks to different swarm types based on user selection or automatic matching. + + The SwarmRouter enables flexible task execution by either using a specified swarm type or automatically determining + the most suitable swarm type for a given task. It handles task execution while managing logging, type validation, + and metadata capture. - This class enables users to specify a swarm type or let the system automatically determine the best swarm type for a given task. It then runs the task on the selected or matched swarm type, ensuring type validation, logging, and metadata capture. + Args: + name (str, optional): Name identifier for the SwarmRouter instance. Defaults to "swarm-router". + description (str, optional): Description of the SwarmRouter's purpose. Defaults to "Routes your task to the desired swarm". + max_loops (int, optional): Maximum number of execution loops. Defaults to 1. + agents (List[Union[Agent, Callable]], optional): List of Agent objects or callables to use. Defaults to empty list. + swarm_type (SwarmType, optional): Type of swarm to use. Defaults to "SequentialWorkflow". + autosave (bool, optional): Whether to enable autosaving. Defaults to False. + flow (str, optional): Flow configuration string. Defaults to None. + return_json (bool, optional): Whether to return results as JSON. Defaults to False. + auto_generate_prompts (bool, optional): Whether to auto-generate agent prompts. Defaults to False. + shared_memory_system (Any, optional): Shared memory system for agents. Defaults to None. + rules (str, optional): Rules to inject into every agent. Defaults to None. + documents (List[str], optional): List of document file paths to use. Defaults to empty list. + output_type (str, optional): Output format type. Defaults to "string". Attributes: - name (str): The name of the SwarmRouter instance. - description (str): A description of the SwarmRouter instance. - max_loops (int): The maximum number of loops to perform. - agents (List[Union[Agent, Callable]]): A list of Agent objects to be used in the swarm. - swarm_type (SwarmType): The type of swarm to be used, which can be specified or automatically determined. - autosave (bool): A flag to enable/disable autosave. - flow (str): The flow of the swarm. - return_json (bool): A flag to enable/disable returning the result in JSON format. - auto_generate_prompts (bool): A flag to enable/disable auto generation of prompts. - swarm (Union[AgentRearrange, MixtureOfAgents, SpreadSheetSwarm, SequentialWorkflow, ConcurrentWorkflow]): - The instantiated swarm object. - logs (List[SwarmLog]): A list of log entries captured during operations. - auto_generate_prompt (bool): A flag to enable/disable auto generation of prompts. + name (str): Name identifier for the SwarmRouter instance + description (str): Description of the SwarmRouter's purpose + max_loops (int): Maximum number of execution loops + agents (List[Union[Agent, Callable]]): List of Agent objects or callables + swarm_type (SwarmType): Type of swarm being used + autosave (bool): Whether autosaving is enabled + flow (str): Flow configuration string + return_json (bool): Whether results are returned as JSON + auto_generate_prompts (bool): Whether prompt auto-generation is enabled + shared_memory_system (Any): Shared memory system for agents + rules (str): Rules injected into every agent + documents (List[str]): List of document file paths + output_type (str): Output format type + logs (List[SwarmLog]): List of execution logs + swarm: The instantiated swarm object Available Swarm Types: - - AgentRearrange: Rearranges agents for optimal task execution. - - MixtureOfAgents: Combines different types of agents for diverse task handling. - - SpreadSheetSwarm: Utilizes spreadsheet-like operations for task management. - - SequentialWorkflow: Executes tasks in a sequential manner. - - ConcurrentWorkflow: Executes tasks concurrently for parallel processing. - - "auto" will automatically conduct embedding search to find the best swarm for your task + - AgentRearrange: Optimizes agent arrangement for task execution + - MixtureOfAgents: Combines multiple agent types for diverse tasks + - SpreadSheetSwarm: Uses spreadsheet-like operations for task management + - SequentialWorkflow: Executes tasks sequentially + - ConcurrentWorkflow: Executes tasks in parallel + - "auto": Automatically selects best swarm type via embedding search + + Methods: + run(task: str, device: str = "cpu", all_cores: bool = False, all_gpus: bool = False, *args, **kwargs) -> Any: + Executes a task using the configured swarm + + batch_run(tasks: List[str], *args, **kwargs) -> List[Any]: + Executes multiple tasks in sequence + + threaded_run(task: str, *args, **kwargs) -> Any: + Executes a task in a separate thread + + async_run(task: str, *args, **kwargs) -> Any: + Executes a task asynchronously + + concurrent_run(task: str, *args, **kwargs) -> Any: + Executes a task using concurrent execution + + concurrent_batch_run(tasks: List[str], *args, **kwargs) -> List[Any]: + Executes multiple tasks concurrently + + get_logs() -> List[SwarmLog]: + Retrieves execution logs """ def __init__( @@ -78,8 +129,12 @@ class SwarmRouter: swarm_type: SwarmType = "SequentialWorkflow", # "SpreadSheetSwarm" # "auto" autosave: bool = False, flow: str = None, - return_json: bool = True, + return_json: bool = False, auto_generate_prompts: bool = False, + shared_memory_system: Any = None, + rules: str = None, + documents: List[str] = [], # A list of docs file paths + output_type: str = "string", # Md, PDF, Txt, csv *args, **kwargs, ): @@ -92,6 +147,10 @@ class SwarmRouter: self.flow = flow self.return_json = return_json self.auto_generate_prompts = auto_generate_prompts + self.shared_memory_system = shared_memory_system + self.rules = rules + self.documents = documents + self.output_type = output_type self.logs = [] self.reliability_check() @@ -101,8 +160,51 @@ class SwarmRouter: f"SwarmRouter initialized with swarm type: {swarm_type}", ) + # Handle Automated Prompt Engineering self.activate_ape() + # Handle shared memory + if self.shared_memory_system is not None: + self.activate_shared_memory() + + # Handle rules + if self.rules is not None: + self.handle_rules() + + # if self.documents is not None: + # self.handle_docs() + + def handle_docs(self): + # Process all documents in parallel using list comprehension + data = "".join( + [doc_master(file_path=doc) for doc in self.documents] + ) + + # Update all agents' prompts at once + doc_prompt = f"##### Documents Available ########## {data}" + for agent in self.agents: + agent.system_prompt += doc_prompt + + # Add documents to the logs + # self.logs.append(Document(file_path=self.documents, data=data)) + + + def activate_shared_memory(self): + logger.info("Activating shared memory with all agents ") + + for agent in self.agents: + agent.long_term_memory = self.shared_memory_system + + logger.info("All agents now have the same memory system") + + def handle_rules(self): + logger.info("Injecting rules to every agent!") + + for agent in self.agents: + agent.system_prompt += f"### Swarm Rules ### {self.rules}" + + logger.info("Finished injecting rules") + def activate_ape(self): """Activate automatic prompt engineering for agents that support it""" try: @@ -134,7 +236,7 @@ class SwarmRouter: @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) def reliability_check(self): - logger.info("Logger initializing checks") + logger.info("Initializing reliability checks") if not self.agents: raise ValueError("No agents provided for the swarm.") @@ -143,7 +245,9 @@ class SwarmRouter: if self.max_loops == 0: raise ValueError("max_loops cannot be 0.") - logger.info("Checks completed your swarm is ready.") + logger.info( + "Reliability checks completed your swarm is ready." + ) def _create_swarm( self, task: str = None, *args, **kwargs @@ -182,6 +286,7 @@ class SwarmRouter: max_loops=self.max_loops, flow=self.flow, return_json=self.return_json, + output_type=self.output_type, *args, **kwargs, ) @@ -206,12 +311,19 @@ class SwarmRouter: *args, **kwargs, ) - elif self.swarm_type == "SequentialWorkflow": + elif ( + self.swarm_type == "SequentialWorkflow" + or self.swarm_type == "sequential" + or self.swarm_type == "Sequential" + ): return SequentialWorkflow( name=self.name, description=self.description, agents=self.agents, max_loops=self.max_loops, + shared_memory_system=self.shared_memory_system, + output_type=self.output_type, + return_json=self.return_json, *args, **kwargs, ) @@ -227,7 +339,9 @@ class SwarmRouter: **kwargs, ) else: - raise ValueError(f"Invalid swarm type: {self.swarm_type}") + raise ValueError( + f"Invalid swarm type: {self.swarm_type} try again with a valid swarm type such as 'SequentialWorkflow' or 'ConcurrentWorkflow' or 'auto' or 'AgentRearrange' or 'MixtureOfAgents' or 'SpreadSheetSwarm'" + ) def _log( self, @@ -256,7 +370,7 @@ class SwarmRouter: logger.log(level.upper(), message) @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) - def run(self, task: str, *args, **kwargs) -> Any: + def _run(self, task: str, *args, **kwargs) -> Any: """ Dynamically run the specified task on the selected or matched swarm type. @@ -281,6 +395,7 @@ class SwarmRouter: metadata=kwargs, ) result = self.swarm.run(task, *args, **kwargs) + self._log( "success", f"Task completed successfully on {self.swarm_type} swarm", @@ -297,6 +412,56 @@ class SwarmRouter: ) raise + def run( + self, + task: str, + device: str = "cpu", + all_cores: bool = True, + all_gpus: bool = False, + *args, + **kwargs, + ) -> Any: + """ + Execute a task on the selected swarm type with specified compute resources. + + Args: + task (str): The task to be executed by the swarm. + device (str, optional): Device to run on - "cpu" or "gpu". Defaults to "cpu". + all_cores (bool, optional): Whether to use all CPU cores. Defaults to True. + all_gpus (bool, optional): Whether to use all available GPUs. Defaults to False. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + + Returns: + Any: The result of the swarm's execution. + + Raises: + Exception: If an error occurs during task execution. + """ + return exec_callable_with_clusterops( + func=self._run, + device=device, + all_cores=all_cores, + all_gpus=all_gpus, + task=task, + *args, + **kwargs, + ) + + def __call__(self, task: str, *args, **kwargs) -> Any: + """ + Make the SwarmRouter instance callable. + + Args: + task (str): The task to be executed by the swarm. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + + Returns: + Any: The result of the swarm's execution. + """ + return self.run(task=task, *args, **kwargs) + def batch_run( self, tasks: List[str], *args, **kwargs ) -> List[Any]: @@ -446,15 +611,26 @@ class SwarmRouter: Raises: Exception: If an error occurs during task execution. """ - from concurrent.futures import ThreadPoolExecutor + from concurrent.futures import ThreadPoolExecutor, as_completed + results = [] with ThreadPoolExecutor() as executor: + # Submit all tasks to executor futures = [ executor.submit(self.run, task, *args, **kwargs) for task in tasks ] - results = [future.result() for future in futures] - return results + + # Process results as they complete rather than waiting for all + for future in as_completed(futures): + try: + result = future.result() + results.append(result) + except Exception as e: + logger.error(f"Task execution failed: {str(e)}") + results.append(None) + + return results def swarm_router( @@ -468,6 +644,7 @@ def swarm_router( return_json: bool = True, auto_generate_prompts: bool = False, task: str = None, + rules: str = None, *args, **kwargs, ) -> SwarmRouter: @@ -518,11 +695,14 @@ def swarm_router( flow=flow, return_json=return_json, auto_generate_prompts=auto_generate_prompts, + rules=rules, ) logger.info(f"Executing task with SwarmRouter: {task}") result = swarm_router.run(task, *args, **kwargs) - logger.info("Task execution completed successfully") + logger.info( + f"Task execution completed successfully: {result}" + ) return result except ValueError as e: diff --git a/swarms/utils/add_docs_to_agents.py b/swarms/utils/add_docs_to_agents.py new file mode 100644 index 00000000..8dbc1df3 --- /dev/null +++ b/swarms/utils/add_docs_to_agents.py @@ -0,0 +1,141 @@ +from typing import Any, List, Optional, Union +from pathlib import Path +from loguru import logger +from doc_master import doc_master +from concurrent.futures import ThreadPoolExecutor, as_completed +from tenacity import retry, stop_after_attempt, wait_exponential + + +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), +) +def _process_document(doc_path: Union[str, Path]) -> str: + """Safely process a single document with retries. + + Args: + doc_path: Path to the document to process + + Returns: + Processed document text + + Raises: + Exception: If document processing fails after retries + """ + try: + return doc_master( + file_path=str(doc_path), output_type="string" + ) + except Exception as e: + logger.error( + f"Error processing document {doc_path}: {str(e)}" + ) + raise + + +def handle_input_docs( + agents: Any, + docs: Optional[List[Union[str, Path]]] = None, + doc_folder: Optional[Union[str, Path]] = None, + max_workers: int = 4, + chunk_size: int = 1000000, +) -> Any: + """ + Add document content to agent prompts with improved reliability and performance. + + Args: + agents: Dictionary mapping agent names to Agent objects + docs: List of document paths + doc_folder: Path to folder containing documents + max_workers: Maximum number of parallel document processing workers + chunk_size: Maximum characters to process at once to avoid memory issues + + Raises: + ValueError: If neither docs nor doc_folder is provided + RuntimeError: If document processing fails + """ + if not agents: + logger.warning( + "No agents provided, skipping document distribution" + ) + return + + if not docs and not doc_folder: + logger.warning( + "No documents or folder provided, skipping document distribution" + ) + return + + logger.info("Starting document distribution to agents") + + try: + processed_docs = [] + + # Process individual documents in parallel + if docs: + with ThreadPoolExecutor( + max_workers=max_workers + ) as executor: + future_to_doc = { + executor.submit(_process_document, doc): doc + for doc in docs + } + + for future in as_completed(future_to_doc): + doc = future_to_doc[future] + try: + processed_docs.append(future.result()) + except Exception as e: + logger.error( + f"Failed to process document {doc}: {str(e)}" + ) + raise RuntimeError( + f"Document processing failed: {str(e)}" + ) + + # Process folder if specified + elif doc_folder: + try: + folder_content = doc_master( + folder_path=str(doc_folder), output_type="string" + ) + processed_docs.append(folder_content) + except Exception as e: + logger.error( + f"Failed to process folder {doc_folder}: {str(e)}" + ) + raise RuntimeError( + f"Folder processing failed: {str(e)}" + ) + + # Combine and chunk the processed documents + combined_data = "\n".join(processed_docs) + + # Update agent prompts in chunks to avoid memory issues + for agent in agents.values(): + try: + for i in range(0, len(combined_data), chunk_size): + chunk = combined_data[i : i + chunk_size] + if i == 0: + agent.system_prompt += ( + "\nDocuments:\n" + chunk + ) + else: + agent.system_prompt += chunk + except Exception as e: + logger.error( + f"Failed to update agent prompt: {str(e)}" + ) + raise RuntimeError( + f"Agent prompt update failed: {str(e)}" + ) + + logger.info( + f"Successfully added documents to {len(agents)} agents" + ) + + return agents + + except Exception as e: + logger.error(f"Document distribution failed: {str(e)}") + raise RuntimeError(f"Document distribution failed: {str(e)}") diff --git a/swarms/utils/any_to_str.py b/swarms/utils/any_to_str.py new file mode 100644 index 00000000..125e233e --- /dev/null +++ b/swarms/utils/any_to_str.py @@ -0,0 +1,102 @@ +from typing import Union, Dict, List, Tuple, Any + + +def any_to_str(data: Union[str, Dict, List, Tuple, Any]) -> str: + """Convert any input data type to a nicely formatted string. + + This function handles conversion of various Python data types into a clean string representation. + It recursively processes nested data structures and handles None values gracefully. + + Args: + data: Input data of any type to convert to string. Can be: + - Dictionary + - List/Tuple + - String + - None + - Any other type that can be converted via str() + + Returns: + str: A formatted string representation of the input data. + - Dictionaries are formatted as "key: value" pairs separated by commas + - Lists/tuples are comma-separated + - None returns empty string + - Other types are converted using str() + + Examples: + >>> any_to_str({'a': 1, 'b': 2}) + 'a: 1, b: 2' + >>> any_to_str([1, 2, 3]) + '1, 2, 3' + >>> any_to_str(None) + '' + """ + try: + if isinstance(data, dict): + # Format dictionary with newlines and indentation + items = [] + for k, v in data.items(): + value = any_to_str(v) + items.append(f"{k}: {value}") + return "\n".join(items) + + elif isinstance(data, (list, tuple)): + # Format sequences with brackets and proper spacing + items = [any_to_str(x) for x in data] + if len(items) == 0: + return "[]" if isinstance(data, list) else "()" + return ( + f"[{', '.join(items)}]" + if isinstance(data, list) + else f"({', '.join(items)})" + ) + + elif data is None: + return "None" + + else: + # Handle strings and other types + if isinstance(data, str): + return f'"{data}"' + return str(data) + + except Exception as e: + return f"Error converting data: {str(e)}" + + +def main(): + # Example 1: Dictionary + print("Dictionary:") + print( + any_to_str( + { + "name": "John", + "age": 30, + "hobbies": ["reading", "hiking"], + } + ) + ) + + print("\nNested Dictionary:") + print( + any_to_str( + { + "user": { + "id": 123, + "details": {"city": "New York", "active": True}, + }, + "data": [1, 2, 3], + } + ) + ) + + print("\nList and Tuple:") + print(any_to_str([1, "text", None, (1, 2)])) + print(any_to_str((True, False, None))) + + print("\nEmpty Collections:") + print(any_to_str([])) + print(any_to_str({})) + + +if __name__ == "__main__": + main() diff --git a/swarms/utils/swarm_output_handling.py b/swarms/utils/swarm_output_handling.py new file mode 100644 index 00000000..d7549100 --- /dev/null +++ b/swarms/utils/swarm_output_handling.py @@ -0,0 +1,34 @@ +from typing import Union, Dict, List +from swarms.artifacts.main_artifact import Artifact + + +def handle_artifact_outputs( + file_path: str, + data: Union[str, Dict, List], + output_type: str = "txt", + folder_path: str = "./artifacts", +) -> str: + """ + Handle different types of data and create files in various formats. + + Args: + file_path: Path where the file should be saved + data: Input data that can be string, dict or list + output_type: Type of output file (txt, md, pdf, csv, json) + folder_path: Folder to save artifacts + + Returns: + str: Path to the created file + """ + # Create artifact with appropriate file type + artifact = Artifact( + folder_path=folder_path, + file_path=file_path, + file_type=output_type, + contents=data, + edit_count=0, + ) + + # Save the file + # artifact.save() + artifact.save_as(output_format=output_type) diff --git a/swarms/utils/swarm_reliability_checks.py b/swarms/utils/swarm_reliability_checks.py new file mode 100644 index 00000000..46145859 --- /dev/null +++ b/swarms/utils/swarm_reliability_checks.py @@ -0,0 +1,78 @@ +from loguru import logger +from typing import List, Union, Callable, Optional +from swarms.structs.agent import Agent + + +def reliability_check( + agents: List[Union[Agent, Callable]], + max_loops: int, + name: Optional[str] = None, + description: Optional[str] = None, + flow: Optional[str] = None, +) -> None: + """ + Performs reliability checks on swarm configuration parameters. + + Args: + agents: List of Agent objects or callables that will be executed + max_loops: Maximum number of execution loops + name: Name identifier for the swarm + description: Description of the swarm's purpose + + Raises: + ValueError: If any parameters fail validation checks + TypeError: If parameters are of incorrect type + """ + logger.info("Initializing swarm reliability checks") + + # Type checking + if not isinstance(agents, list): + raise TypeError("agents parameter must be a list") + + if not isinstance(max_loops, int): + raise TypeError("max_loops must be an integer") + + # Validate agents + if not agents: + raise ValueError("Agents list cannot be empty") + + for i, agent in enumerate(agents): + if not isinstance(agent, (Agent, Callable)): + raise TypeError( + f"Agent at index {i} must be an Agent instance or Callable" + ) + + # Validate max_loops + if max_loops <= 0: + raise ValueError("max_loops must be greater than 0") + + if max_loops > 1000: + logger.warning( + "Large max_loops value detected. This may impact performance." + ) + + # Validate name + if name is None: + raise ValueError("name parameter is required") + if not isinstance(name, str): + raise TypeError("name must be a string") + if len(name.strip()) == 0: + raise ValueError("name cannot be empty or just whitespace") + + # Validate description + if description is None: + raise ValueError("description parameter is required") + if not isinstance(description, str): + raise TypeError("description must be a string") + if len(description.strip()) == 0: + raise ValueError( + "description cannot be empty or just whitespace" + ) + + # Validate flow + if flow is None: + raise ValueError("flow parameter is required") + if not isinstance(flow, str): + raise TypeError("flow must be a string") + + logger.info("All reliability checks passed successfully") diff --git a/swarms/utils/wrapper_clusterop.py b/swarms/utils/wrapper_clusterop.py new file mode 100644 index 00000000..3ee8d3e4 --- /dev/null +++ b/swarms/utils/wrapper_clusterop.py @@ -0,0 +1,77 @@ +import os +from typing import Any + +from clusterops import ( + execute_on_gpu, + execute_on_multiple_gpus, + execute_with_cpu_cores, + list_available_gpus, +) +from loguru import logger + + +def exec_callable_with_clusterops( + device: str = "cpu", + device_id: int = 0, + all_cores: bool = True, + all_gpus: bool = False, + func: callable = None, + *args, + **kwargs, +) -> Any: + """ + Executes a given function on a specified device, either CPU or GPU. + + This method attempts to execute a given function on a specified device, either CPU or GPU. It logs the device selection and the number of cores or GPU ID used. If the device is set to CPU, it can use all available cores or a specific core specified by `device_id`. If the device is set to GPU, it uses the GPU specified by `device_id`. + + Args: + device (str, optional): The device to use for execution. Defaults to "cpu". + device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0. + all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True. + all_gpus (bool, optional): If True, uses all available GPUs. Defaults to False. + func (callable): The function to execute. + *args: Additional positional arguments to be passed to the execution method. + **kwargs: Additional keyword arguments to be passed to the execution method. + + Returns: + Any: The result of the execution. + + Raises: + ValueError: If an invalid device is specified. + Exception: If any other error occurs during execution. + """ + try: + logger.info(f"Attempting to run on device: {device}") + if device == "cpu": + logger.info("Device set to CPU") + if all_cores is True: + count = os.cpu_count() + logger.info(f"Using all available CPU cores: {count}") + else: + count = device_id + logger.info(f"Using specific CPU core: {count}") + + return execute_with_cpu_cores( + count, func, *args, **kwargs + ) + + # If device gpu + elif device == "gpu": + logger.info("Device set to GPU") + return execute_on_gpu(device_id, func, *args, **kwargs) + elif device == "gpu" and all_gpus is True: + logger.info("Device set to GPU and running all gpus") + gpus = [int(gpu) for gpu in list_available_gpus()] + return execute_on_multiple_gpus( + gpus, func, *args, **kwargs + ) + else: + raise ValueError( + f"Invalid device specified: {device}. Supported devices are 'cpu' and 'gpu'." + ) + except ValueError as e: + logger.error(f"Invalid device specified: {e}") + raise e + except Exception as e: + logger.error(f"An error occurred during execution: {e}") + raise e diff --git a/tool_builder.py b/tool_builder.py deleted file mode 100644 index c4219060..00000000 --- a/tool_builder.py +++ /dev/null @@ -1,170 +0,0 @@ -import os -from pydantic import BaseModel, Field -from swarm_models import OpenAIFunctionCaller -from dotenv import load_dotenv -from typing import Any -from swarms.utils.loguru_logger import logger -from swarms.tools.prebuilt.code_executor import CodeExecutor - -load_dotenv() - - -class Tool(BaseModel): - id: str = Field( - description="A unique identifier for the task. This should be a short, descriptive name that captures the main purpose of the task. Use - to separate words and make it lowercase." - ) - plan: str = Field( - description="The comprehensive plan detailing how the task will accomplish the given task. This should include the high-level strategy, key milestones, and expected outcomes. The plan should clearly articulate what the overall goal is, what success looks like, and how progress will be measured throughout execution." - ) - failures_prediction: str = Field( - description="A thorough analysis of potential failure modes and mitigation strategies. This should identify technical risks, edge cases, error conditions, and possible points of failure in the task. For each identified risk, include specific preventive measures, fallback approaches, and recovery procedures to ensure robustness and reliability." - ) - rationale: str = Field( - description="The detailed reasoning and justification for why this specific task design is optimal for the given task. This should explain the key architectural decisions, tradeoffs considered, alternatives evaluated, and why this approach best satisfies the requirements. Include both technical and business factors that influenced the design." - ) - code: str = Field( - description="Generate the code for the task. This should be a python function that takes in a task and returns a result. The code should be a complete and working implementation of the task. Include all necessary imports and dependencies and add types, docstrings, and comments to the code. Make sure the main code executes successfully. No placeholders or comments. Make sure the main function executes successfully." - ) - - -def setup_model(base_model: BaseModel = Tool): - model = OpenAIFunctionCaller( - system_prompt="""You are an expert Python developer specializing in building reliable API integrations and developer tools. Your role is to generate production-ready code that follows best practices for API interactions and tool development. - - When given a task, you will: - 1. Design robust error handling and retry mechanisms for API calls - 2. Implement proper authentication and security measures - 3. Structure code for maintainability and reusability - 4. Add comprehensive logging and monitoring - 5. Include detailed type hints and documentation - 6. Write unit tests to verify functionality - - Your code should follow these principles: - - Use modern Python features and idioms - - Handle rate limits and API quotas gracefully - - Validate inputs and outputs thoroughly - - Follow security best practices for API keys and secrets - - Include clear error messages and debugging info - - Be well-documented with docstrings and comments - - Use appropriate design patterns - - Follow PEP 8 style guidelines - - The generated code should be complete, tested, and ready for production use. Include all necessary imports, error handling, and helper functions. - """, - base_model=base_model, - openai_api_key=os.getenv("OPENAI_API_KEY"), - temperature=0.5, - ) - return model - - -def generate_tool(task: str) -> Any: - model = setup_model() - response = model.run(task) - logger.info(f"Response: {response}") - - # If response is a dict, get code directly - if isinstance(response, dict): - # return response.get("code", "") - code = response.get("code", "") - logger.info(f"Code: {code}") - return code - # If response is a Tool object, access code attribute - elif isinstance(response, Tool): - code = response.code - logger.info(f"Code: {code}") - return code - # If response is a string (raw code) - elif isinstance(response, str): - code = response - logger.info(f"Code: {code}") - return code - logger.error(f"Unexpected response type: {type(response)}") - return "" - - -def execute_generated_code(code: str) -> Any: - """ - Attempts to execute the generated Python code, handling errors and retrying if necessary. - - Args: - code (str): The Python code to be executed. - - Returns: - Any: Output of the code execution, or error details if execution fails. - """ - logger.info("Starting code execution") - try: - exec_namespace = {} - exec(code, exec_namespace) - - # Check for any callable functions in the namespace - main_function = None - for item in exec_namespace.values(): - if callable(item) and not item.__name__.startswith("__"): - main_function = item - break - - if main_function: - result = main_function() - logger.info( - f"Code execution successful. Function result: {result}" - ) - return result - elif "result" in exec_namespace: - logger.info( - f"Code execution successful. Result variable: {exec_namespace['result']}" - ) - return exec_namespace["result"] - else: - logger.warning( - "Code execution completed but no result found" - ) - return "No result or function found in executed code." - except Exception as e: - logger.error( - f"Code execution failed with error: {str(e)}", - exc_info=True, - ) - return e - - -def retry_until_success(task: str, max_retries: int = 5): - """ - Generates and executes code until the execution is successful. - - Args: - task (str): Task description to generate the required code. - """ - attempts = 0 - - while attempts < max_retries: - logger.info(f"Attempt {attempts + 1} of {max_retries}") - tool = generate_tool(task) - logger.debug(f"Generated code:\n{tool}") - - # result = execute_generated_code(tool) - result = CodeExecutor().execute(code=tool) - logger.info(f"Result: {result}") - - if isinstance(result, Exception): - logger.error( - f"Attempt {attempts + 1} failed: {str(result)}" - ) - print("Retrying with updated code...") - attempts += 1 - else: - logger.info( - f"Success on attempt {attempts + 1}. Result: {result}" - ) - print(f"Code executed successfully: {result}") - break - else: - logger.error("Max retries reached. Execution failed.") - print("Max retries reached. Execution failed.") - - -# Usage -retry_until_success( - "Write a function to fetch and display weather information from a given API." -)