pull/643/head
Your Name 2 months ago
parent 9d471c2188
commit c1c98a85af

@ -9,6 +9,7 @@ from swarm_models import OpenAIFunctionCaller, OpenAIChat
from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.agents_available import showcase_available_agents
logger = initialize_logger(log_folder="auto_swarm_builder")
@ -27,10 +28,10 @@ class AgentConfig(BaseModel):
description="The system prompt that defines the agent's behavior",
example="You are a research agent. Your role is to gather and analyze information...",
)
max_loops: int = Field(
description="Maximum number of reasoning loops the agent can perform",
example=3,
)
# max_loops: int = Field(
# description="Maximum number of reasoning loops the agent can perform",
# example=3,
# )
class SwarmConfig(BaseModel):
@ -214,10 +215,20 @@ class AutoSwarmBuilder:
agent_name=agent_config.name,
agent_description=agent_config.description,
agent_system_prompt=agent_config.system_prompt,
max_loops=agent_config.max_loops,
# max_loops=agent_config.max_loops,
)
agents.append(agent)
# Showcasing available agents
agents_available = showcase_available_agents(
name=self.name,
description=self.description,
agents=agents,
)
for agent in agents:
agent.system_prompt += "\n" + agents_available
return agents
def build_agent(
@ -283,6 +294,8 @@ class AutoSwarmBuilder:
"""
logger.info("Routing task through swarm")
swarm_router_instance = SwarmRouter(
name=self.name,
description=self.description,
agents=agents,
swarm_type="auto",
max_loops=1,
@ -293,10 +306,14 @@ class AutoSwarmBuilder:
)
example = AutoSwarmBuilder()
example = AutoSwarmBuilder(
name="ChipDesign-Swarm",
description="A swarm of specialized AI agents collaborating on chip architecture, logic design, verification, and optimization to create novel semiconductor designs",
max_loops=1,
)
print(
example.run(
"Write multiple blog posts about the latest advancements in swarm intelligence all at once"
"Design a new AI accelerator chip optimized for transformer model inference. Consider the following aspects: 1) Overall chip architecture and block diagram 2) Memory hierarchy and interconnects 3) Processing elements and data flow 4) Power and thermal considerations 5) Physical layout recommendations -> "
)
)

@ -88,12 +88,9 @@ tasks = [
# Run agents with tasks concurrently
results = run_agents_with_tasks_concurrently(
agents,
tasks,
all_cores=True,
device="cpu",
agents, tasks, all_cores=True, device="cpu", no_clusterops=True
)
# Print the results
for result in results:
print(result)
# for result in results:
# print(result)

@ -0,0 +1,54 @@
import pandas as pd
import json
from loguru import logger
def dict_to_dataframe(data: dict) -> pd.DataFrame:
"""
Converts a dictionary into a Pandas DataFrame with formatted values.
Handles non-serializable values gracefully by skipping them.
Args:
data (dict): The dictionary to convert.
Returns:
pd.DataFrame: A DataFrame representation of the dictionary.
"""
formatted_data = {}
for key, value in data.items():
try:
# Attempt to serialize the value
if isinstance(value, list):
# Format list as comma-separated string
formatted_value = ", ".join(
str(item) for item in value
)
elif isinstance(value, dict):
# Format dict as key-value pairs
formatted_value = ", ".join(
f"{k}: {v}" for k, v in value.items()
)
else:
# Convert other serializable types to string
formatted_value = json.dumps(
value
) # Serialize value to string
formatted_data[key] = formatted_value
except (TypeError, ValueError) as e:
# Log and skip non-serializable items
logger.warning(
f"Skipping non-serializable key '{key}': {e}"
)
continue
# Convert the formatted dictionary into a DataFrame
return pd.DataFrame(
list(formatted_data.items()), columns=["Key", "Value"]
)
example = dict_to_dataframe(data={"chicken": "noodle_soup"})
# formatter.print_panel(example)
print(example)

@ -0,0 +1,56 @@
# Swarms Corp Culture Document
## **Our Mission and Purpose**
At Swarms Corp, we believe in more than just building technology. We are advancing humanity by pioneering systems that allow agents—both AI and human—to collaborate seamlessly, working toward the betterment of society and unlocking a future of abundance. Our mission is everything, and each of us is here because we understand the transformative potential of our work. We are not just a company; we are a movement aimed at reshaping the future. We strive to create systems that can tackle the most complex challenges facing humanity, from climate change to inequality, with solutions that are powered by collective intelligence.
Our purpose goes beyond just technological advancement. We are here to create tools that empower people, uplift communities, and set a new standard for what technology can achieve when the mission is clear and the commitment is unwavering. We see every project as a step toward something greater—an abundant future where human potential is limitless and artificial intelligence serves as a powerful ally to mankind.
## **Values We Live By**
### 1. **Hard Work: No Stone Unturned**
We believe that hard work is the foundation of all great achievements. At Swarms Corp, each member of the team is dedicated to putting in the effort required to solve complex problems. This isnt just about long hours—its about focused, intentional work that leads to breakthroughs. We hold each other to high standards, and we dont shy away from the hard paths when the mission calls for it. Every challenge we face is an opportunity to demonstrate our resilience and our commitment to excellence. We understand that the pursuit of groundbreaking innovation demands not just effort, but a relentless curiosity and the courage to face the unknown.
At Swarms Corp, we respect the grind because we know that transformative change doesnt happen overnight. It requires continuous effort, sacrifice, and an unwavering focus on the task at hand. We celebrate hard work, not because its difficult, but because we understand its potential to transform ambitious ideas into tangible solutions. We honor the sweat equity that goes into building something that can truly make a difference.
### 2. **Mission Above Everything**
Our mission is our guiding star. Every decision, every task, and every project must align with our overarching purpose: advancing humanity and creating a post-scarcity world. This means sometimes putting the collective goal ahead of individual preferences or comfort. Were here to do something much larger than ourselves, and we prioritize the mission with relentless commitment. We know that personal sacrifices will often be necessary, and we embrace that reality because the rewards of our mission are far greater than any individual gain.
When we say "mission above everything," we mean that our focus is not just on immediate success, but on creating a lasting impact that will benefit future generations. Our mission provides meaning and direction to our daily efforts, and we see every task as a small yet crucial part of our broader vision. We remind ourselves constantly of why we are here and who we are working for—not just our customers or stakeholders, but humanity as a whole.
### 3. **Finding the Shortest Path**
Innovation thrives on efficiency. At Swarms Corp, we value finding the shortest, most effective paths to reach our goals. We encourage everyone to question the status quo, challenge existing processes, and ask, “Is there a better way to do this?” Creativity means finding new routes—whether by leveraging automation, questioning outdated steps, or collaborating to uncover insights faster. We honor those who seek smarter paths over conventional ones. Efficiency is not just about saving time—its about maximizing impact and ensuring that every ounce of effort drives meaningful progress.
Finding the shortest path is about eliminating unnecessary complexity and focusing our energy on what truly matters. We encourage a culture of continuous improvement, where each team member is empowered to innovate on processes, tools, and methodologies. The shortest path does not mean cutting corners—it means removing obstacles, optimizing workflows, and focusing on high-leverage activities that bring us closer to our mission. We celebrate those who find elegant, effective solutions that others might overlook.
### 4. **Advancing Humanity**
The ultimate goal of everything we do is to elevate humanity. We envision a world where intelligence—both human and artificial—works in harmony to improve lives, solve global challenges, and expand possibilities. This ethos drives our work, whether its developing advanced AI systems, collaborating with others to push technological boundaries, or thinking deeply about how our creations can impact society in positive ways. Every line of code, every idea, and every strategy should move us closer to this vision.
Advancing humanity means we always think about the ethical implications of our work. We are deeply aware that the technology we create has the power to transform lives, and with that power comes the responsibility to ensure our contributions are always positive. We seek not only to push the boundaries of what technology can do but also to ensure that these advancements are inclusive and equitable. Our focus is on building a future where every person has access to the tools and opportunities they need to thrive.
Our vision is to bridge the gap between technology and humanitys most pressing needs. We aim to democratize intelligence, making it available for everyone, regardless of their background or resources. This is how we advance humanity—not just through technological feats, but by ensuring that our innovations serve the greater good and uplift everyone.
## **Our Way of Working**
- **Radical Ownership**: Each team member is not just a contributor but an owner of their domain. We take full responsibility for outcomes, follow through on our promises, and ensure that nothing falls through the cracks. We dont wait for permission—we act, innovate, and lead. Radical ownership means understanding that our actions have a direct impact on the success of our mission. Its about proactive problem-solving and always stepping up when we see an opportunity to make a difference.
- **Honesty and Respect**: We communicate openly and respect each others opinions. Tough conversations are a natural part of building something impactful. We face challenges head-on with honesty and directness while maintaining a respectful and supportive atmosphere. Honesty fosters trust, and trust is the foundation of any high-performing team. We value feedback and see it as an essential tool for growth—both for individuals and for the organization as a whole.
- **One Team, One Mission**: Collaboration isnt just encouraged—its essential. We operate as a swarm, where each agent contributes to a greater goal, learning from each other, sharing knowledge, and constantly iterating together. We celebrate wins collectively and approach obstacles with a unified spirit. No one succeeds alone; every achievement is the result of collective effort. We lift each other up, and we know that our strength lies in our unity and shared purpose.
- **The Future is Ours to Shape**: Our work is inherently future-focused. Were not satisfied with simply keeping up—we want to set the pace. Every day, we take one step closer to a future where humanitys potential is limitless, where scarcity is eliminated, and where intelligence—human and machine—advances society. We are not passive participants in the future; we are active shapers of it. We imagine a better tomorrow, and then we take deliberate steps to create it. Our work today will define what the world looks like tomorrow.
## **Expectations**
- **Be Bold**: Dont be afraid to take risks. Innovation requires experimentation, and sometimes that means making mistakes. We support each other in learning from failures and taking smart, calculated risks. Boldness is at the heart of progress. We want every member of Swarms Corp to feel empowered to think outside the box, propose unconventional ideas, and drive innovation. Mistakes are seen not as setbacks, but as opportunities for learning and growth.
- **Keep the Mission First**: Every decision we make should be with our mission in mind. Ask yourself how your work advances the cause of creating an abundant future. The mission is the yardstick against which we measure our efforts, ensuring that everything we do pushes us closer to our ultimate goals. We understand that the mission is bigger than any one of us, and we strive to contribute meaningfully every day.
- **Find Solutions, Not Problems**: While identifying issues is important, we value those who come with solutions. Embrace challenges as opportunities to innovate and find ways to make an impact. We foster a culture of proactive problem-solving where obstacles are seen as opportunities to exercise creativity. If somethings broken, we fix it. If theres a better way, we find it. We expect our team members to be solution-oriented, always seeking ways to turn challenges into stepping stones for progress.
- **Think Big, Act Fast**: Were not here to make small changes—were here to revolutionize how we think about intelligence, automation, and society. Dream big, but work with urgency. We are tackling problems of immense scale, and we must move with intention and speed. Thinking big means envisioning a world that is radically different and better, and acting fast means executing the steps to get us there without hesitation. We value ambition and the courage to move swiftly when the time is right.
## **Our Commitment to You**
Swarms Corp is a place for dreamers and doers, for those who are driven by purpose and are unafraid of the work required to achieve it. We commit to providing you with the tools, support, and environment you need to contribute meaningfully to our mission. We are here to advance humanity together, one agent, one solution, one breakthrough at a time. We pledge to nurture an environment that encourages creativity, collaboration, and bold thinking. Here, you will find a community that celebrates your wins, supports you through challenges, and pushes you to be your best self.
Our commitment also includes ensuring that your voice is heard. We are building the future together, and every perspective matters. We strive to create an inclusive space where diversity of thought is welcomed, and where each team member feels valued for their unique contributions. At Swarms Corp, you are not just part of a team—you are part of a mission that aims to change the course of humanity for the better. Together, well make the impossible possible, one breakthrough at a time.

@ -257,6 +257,7 @@ nav:
- An Analysis on Prompting Strategies: "swarms/prompts/overview.md"
- Managing Prompts in Production: "swarms/prompts/main.md"
- Corporate:
- Culture: "corporate/culture.md"
- Hiring: "corporate/hiring.md"
- Swarms Goals & Milestone Tracking; A Vision for 2024 and Beyond: "corporate/2024_2025_goals.md"
- Clusterops:

@ -31,15 +31,16 @@ agent = Agent(
saved_state_path="finance_agent.json",
user_name="swarms_corp",
retry_attempts=1,
streaming_on=True,
context_length=200000,
return_step_meta=True,
output_type="json", # "json", "dict", "csv" OR "string" soon "yaml" and
streaming_on=False,
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
artifacts_on=True,
artifacts_output_path="roth_ira_report",
artifacts_file_extension=".txt",
max_tokens=8000,
return_history=True,
)

@ -0,0 +1,228 @@
import os
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
from loguru import logger
from swarm_models import OpenAIChat
from swarms import Agent, AgentRearrange
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
class LlamaIndexDB:
"""A class to manage document indexing and querying using LlamaIndex.
This class provides functionality to add documents from a directory and query the indexed documents.
Args:
data_dir (str): Directory containing documents to index. Defaults to "docs".
**kwargs: Additional arguments passed to SimpleDirectoryReader and VectorStoreIndex.
SimpleDirectoryReader kwargs:
- filename_as_id (bool): Use filenames as document IDs
- recursive (bool): Recursively read subdirectories
- required_exts (List[str]): Only read files with these extensions
- exclude_hidden (bool): Skip hidden files
VectorStoreIndex kwargs:
- service_context: Custom service context
- embed_model: Custom embedding model
- similarity_top_k (int): Number of similar docs to retrieve
- store_nodes_override (bool): Override node storage
"""
def __init__(self, data_dir: str = "docs", **kwargs) -> None:
"""Initialize the LlamaIndexDB with an empty index.
Args:
data_dir (str): Directory containing documents to index
**kwargs: Additional arguments for SimpleDirectoryReader and VectorStoreIndex
"""
self.data_dir = data_dir
self.index: Optional[VectorStoreIndex] = None
self.reader_kwargs = {
k: v
for k, v in kwargs.items()
if k
in SimpleDirectoryReader.__init__.__code__.co_varnames
}
self.index_kwargs = {
k: v
for k, v in kwargs.items()
if k not in self.reader_kwargs
}
logger.info("Initialized LlamaIndexDB")
data_path = Path(self.data_dir)
if not data_path.exists():
logger.error(f"Directory not found: {self.data_dir}")
raise FileNotFoundError(
f"Directory {self.data_dir} does not exist"
)
try:
documents = SimpleDirectoryReader(
self.data_dir, **self.reader_kwargs
).load_data()
self.index = VectorStoreIndex.from_documents(
documents, **self.index_kwargs
)
logger.success(
f"Successfully indexed documents from {self.data_dir}"
)
except Exception as e:
logger.error(f"Error indexing documents: {str(e)}")
raise
def query(self, query: str, **kwargs) -> str:
"""Query the indexed documents.
Args:
query (str): The query string to search for
**kwargs: Additional arguments passed to the query engine
- similarity_top_k (int): Number of similar documents to retrieve
- streaming (bool): Enable streaming response
- response_mode (str): Response synthesis mode
- max_tokens (int): Maximum tokens in response
Returns:
str: The response from the query engine
Raises:
ValueError: If no documents have been indexed yet
"""
if self.index is None:
logger.error("No documents have been indexed yet")
raise ValueError("Must add documents before querying")
try:
query_engine = self.index.as_query_engine(**kwargs)
response = query_engine.query(query)
print(response)
logger.info(f"Successfully queried: {query}")
return str(response)
except Exception as e:
logger.error(f"Error during query: {str(e)}")
raise
# Initialize specialized medical agents
medical_data_extractor = Agent(
agent_name="Medical-Data-Extractor",
system_prompt="You are a specialized medical data extraction expert, trained in processing and analyzing clinical data, lab results, medical imaging reports, and patient records. Your role is to carefully extract relevant medical information while maintaining strict HIPAA compliance and patient confidentiality. Focus on identifying key clinical indicators, test results, vital signs, medication histories, and relevant patient history. Pay special attention to temporal relationships between symptoms, treatments, and outcomes. Ensure all extracted data maintains proper medical context and terminology.",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="medical_data_extractor.json",
user_name="medical_team",
retry_attempts=1,
context_length=200000,
output_type="string",
)
diagnostic_specialist = Agent(
agent_name="Diagnostic-Specialist",
system_prompt="You are a senior diagnostic physician with extensive experience in differential diagnosis. Your role is to analyze patient symptoms, lab results, and clinical findings to develop comprehensive diagnostic assessments. Consider all presenting symptoms, patient history, risk factors, and test results to formulate possible diagnoses. Prioritize diagnoses based on clinical probability and severity. Always consider both common and rare conditions that match the symptom pattern. Recommend additional tests or imaging when needed for diagnostic clarity. Follow evidence-based diagnostic criteria and current medical guidelines.",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="diagnostic_specialist.json",
user_name="medical_team",
retry_attempts=1,
context_length=200000,
output_type="string",
)
treatment_planner = Agent(
agent_name="Treatment-Planner",
system_prompt="You are an experienced clinical treatment specialist focused on developing comprehensive treatment plans. Your expertise covers both acute and chronic condition management, medication selection, and therapeutic interventions. Consider patient-specific factors including age, comorbidities, allergies, and contraindications when recommending treatments. Incorporate both pharmacological and non-pharmacological interventions. Emphasize evidence-based treatment protocols while considering patient preferences and quality of life. Address potential drug interactions and side effects. Include monitoring parameters and treatment milestones.",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="treatment_planner.json",
user_name="medical_team",
retry_attempts=1,
context_length=200000,
output_type="string",
)
specialist_consultant = Agent(
agent_name="Specialist-Consultant",
system_prompt="You are a medical specialist consultant with expertise across multiple disciplines including cardiology, neurology, endocrinology, and internal medicine. Your role is to provide specialized insight for complex cases requiring deep domain knowledge. Analyze cases from your specialist perspective, considering rare conditions and complex interactions between multiple systems. Provide detailed recommendations for specialized testing, imaging, or interventions within your domain. Highlight potential complications or considerations that may not be immediately apparent to general practitioners.",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="specialist_consultant.json",
user_name="medical_team",
retry_attempts=1,
context_length=200000,
output_type="string",
)
patient_care_coordinator = Agent(
agent_name="Patient-Care-Coordinator",
system_prompt="You are a patient care coordinator specializing in comprehensive healthcare management. Your role is to ensure holistic patient care by coordinating between different medical specialists, considering patient needs, and managing care transitions. Focus on patient education, medication adherence, lifestyle modifications, and follow-up care planning. Consider social determinants of health, patient resources, and access to care. Develop actionable care plans that patients can realistically follow. Coordinate with other healthcare providers to ensure continuity of care and proper implementation of treatment plans.",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="patient_care_coordinator.json",
user_name="medical_team",
retry_attempts=1,
context_length=200000,
output_type="string",
)
# Initialize the SwarmRouter to coordinate the medical agents
router = AgentRearrange(
name="medical-diagnosis-treatment-swarm",
description="Collaborative medical team for comprehensive patient diagnosis and treatment planning",
max_loops=1, # Limit to one iteration through the agent flow
agents=[
medical_data_extractor, # First agent to extract medical data
diagnostic_specialist, # Second agent to analyze and diagnose
treatment_planner, # Third agent to plan treatment
specialist_consultant, # Fourth agent to provide specialist input
patient_care_coordinator, # Final agent to coordinate care plan
],
# Configure the document storage and retrieval system
memory_system=LlamaIndexDB(
data_dir="docs", # Directory containing medical documents
filename_as_id=True, # Use filenames as document identifiers
recursive=True, # Search subdirectories
# required_exts=[".txt", ".pdf", ".docx"], # Supported file types
similarity_top_k=10, # Return top 10 most relevant documents
),
# Define the sequential flow of information between agents
flow=f"{medical_data_extractor.agent_name} -> {diagnostic_specialist.agent_name} -> {treatment_planner.agent_name} -> {specialist_consultant.agent_name} -> {patient_care_coordinator.agent_name}",
)
# Example usage
if __name__ == "__main__":
# Run a comprehensive medical analysis task for patient Lucas Brown
router.run(
"Analyze this Lucas Brown's medical data to provide a diagnosis and treatment plan"
)

@ -0,0 +1,63 @@
import os
import google.generativeai as genai
from loguru import logger
class GeminiModel:
"""
Represents a GeminiModel instance for generating text based on user input.
"""
def __init__(
self,
temperature: float,
top_p: float,
top_k: float,
):
"""
Initializes the GeminiModel by setting up the API key, generation configuration, and starting a chat session.
Raises a KeyError if the GEMINI_API_KEY environment variable is not found.
"""
try:
api_key = os.environ["GEMINI_API_KEY"]
genai.configure(api_key=api_key)
self.generation_config = {
"temperature": 1,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
"response_mime_type": "text/plain",
}
self.model = genai.GenerativeModel(
model_name="gemini-1.5-pro",
generation_config=self.generation_config,
)
self.chat_session = self.model.start_chat(history=[])
except KeyError as e:
logger.error(f"Environment variable not found: {e}")
raise
def run(self, task: str) -> str:
"""
Sends a message to the chat session and returns the response text.
Raises an Exception if there's an error running the GeminiModel.
Args:
task (str): The input task or message to send to the chat session.
Returns:
str: The response text from the chat session.
"""
try:
response = self.chat_session.send_message(task)
return response.text
except Exception as e:
logger.error(f"Error running GeminiModel: {e}")
raise
# Example usage
if __name__ == "__main__":
gemini_model = GeminiModel()
output = gemini_model.run("INSERT_INPUT_HERE")
print(output)

@ -0,0 +1,238 @@
"""
Todo
- You send structured data to the swarm through the users form they make
- then connect rag for every agent using llama index to remember all the students data
- structured outputs
"""
import os
from dotenv import load_dotenv
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat, OpenAIFunctionCaller
from pydantic import BaseModel
from typing import List
class CollegeLog(BaseModel):
college_name: str
college_description: str
college_admission_requirements: str
class CollegesRecommendation(BaseModel):
colleges: List[CollegeLog]
reasoning: str
load_dotenv()
# Get the API key from environment variable
api_key = os.getenv("GROQ_API_KEY")
# Initialize the model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
FINAL_AGENT_PROMPT = """
You are a college selection final decision maker. Your role is to:
1. Synthesize all previous analyses and discussions
2. Weigh competing factors and trade-offs
3. Create a final ranked list of recommended colleges
4. Provide clear rationale for each recommendation
5. Include specific action items for each selected school
6. Outline next steps in the application process
Focus on creating actionable, well-reasoned final recommendations that
balance all relevant factors and stakeholder input.
"""
function_caller = OpenAIFunctionCaller(
system_prompt=FINAL_AGENT_PROMPT,
openai_api_key=os.getenv("OPENAI_API_KEY"),
base_model=CollegesRecommendation,
parallel_tool_calls=True,
)
# Student Profile Analyzer Agent
profile_analyzer_agent = Agent(
agent_name="Student-Profile-Analyzer",
system_prompt="""You are an expert student profile analyzer. Your role is to:
1. Analyze academic performance, test scores, and extracurricular activities
2. Identify student's strengths, weaknesses, and unique qualities
3. Evaluate personal statements and essays
4. Assess leadership experiences and community involvement
5. Determine student's preferences for college environment, location, and programs
6. Create a comprehensive student profile summary
Always consider both quantitative metrics (GPA, test scores) and qualitative aspects
(personal growth, challenges overcome, unique perspectives).""",
llm=model,
max_loops=1,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="profile_analyzer_agent.json",
user_name="student",
context_length=200000,
output_type="string",
)
# College Research Agent
college_research_agent = Agent(
agent_name="College-Research-Specialist",
system_prompt="""You are a college research specialist. Your role is to:
1. Maintain updated knowledge of college admission requirements
2. Research academic programs, campus culture, and student life
3. Analyze admission statistics and trends
4. Evaluate college-specific opportunities and resources
5. Consider financial aid availability and scholarship opportunities
6. Track historical admission data and acceptance rates
Focus on providing accurate, comprehensive information about each institution
while considering both academic and cultural fit factors.""",
llm=model,
max_loops=1,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="college_research_agent.json",
user_name="researcher",
context_length=200000,
output_type="string",
)
# College Match Agent
college_match_agent = Agent(
agent_name="College-Match-Maker",
system_prompt="""You are a college matching specialist. Your role is to:
1. Compare student profiles with college requirements
2. Evaluate fit based on academic, social, and cultural factors
3. Consider geographic preferences and constraints
4. Assess financial fit and aid opportunities
5. Create tiered lists of reach, target, and safety schools
6. Explain the reasoning behind each match
Always provide a balanced list with realistic expectations while
considering both student preferences and admission probability.""",
llm=model,
max_loops=1,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="college_match_agent.json",
user_name="matcher",
context_length=200000,
output_type="string",
)
# Debate Moderator Agent
debate_moderator_agent = Agent(
agent_name="Debate-Moderator",
system_prompt="""You are a college selection debate moderator. Your role is to:
1. Facilitate discussions between different perspectives
2. Ensure all relevant factors are considered
3. Challenge assumptions and biases
4. Synthesize different viewpoints
5. Guide the group toward consensus
6. Document key points of agreement and disagreement
Maintain objectivity while ensuring all important factors are thoroughly discussed
and evaluated.""",
llm=model,
max_loops=1,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="debate_moderator_agent.json",
user_name="moderator",
context_length=200000,
output_type="string",
)
# Critique Agent
critique_agent = Agent(
agent_name="College-Selection-Critic",
system_prompt="""You are a college selection critic. Your role is to:
1. Evaluate the strength of college matches
2. Identify potential overlooked factors
3. Challenge assumptions in the selection process
4. Assess risks and potential drawbacks
5. Provide constructive feedback on selections
6. Suggest alternative options when appropriate
Focus on constructive criticism that helps improve the final college list
while maintaining realistic expectations.""",
llm=model,
max_loops=1,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="critique_agent.json",
user_name="critic",
context_length=200000,
output_type="string",
)
# Final Decision Agent
final_decision_agent = Agent(
agent_name="Final-Decision-Maker",
system_prompt="""
You are a college selection final decision maker. Your role is to:
1. Synthesize all previous analyses and discussions
2. Weigh competing factors and trade-offs
3. Create a final ranked list of recommended colleges
4. Provide clear rationale for each recommendation
5. Include specific action items for each selected school
6. Outline next steps in the application process
Focus on creating actionable, well-reasoned final recommendations that
balance all relevant factors and stakeholder input.
""",
llm=model,
max_loops=1,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="final_decision_agent.json",
user_name="decision_maker",
context_length=200000,
output_type="string",
)
# Initialize the Sequential Workflow
college_selection_workflow = AgentRearrange(
name="college-selection-swarm",
description="Comprehensive college selection and analysis system",
max_loops=1,
agents=[
profile_analyzer_agent,
college_research_agent,
college_match_agent,
debate_moderator_agent,
critique_agent,
final_decision_agent,
],
output_type="all",
flow=f"{profile_analyzer_agent.name} -> {college_research_agent.name} -> {college_match_agent.name} -> {debate_moderator_agent.name} -> {critique_agent.name} -> {final_decision_agent.name}",
)
# Example usage
if __name__ == "__main__":
# Example student profile input
student_profile = """
Student Profile:
- GPA: 3.8
- SAT: 1450
- Interests: Computer Science, Robotics
- Location Preference: East Coast
- Extracurriculars: Robotics Club President, Math Team
- Budget: Need financial aid
- Preferred Environment: Medium-sized urban campus
"""
# Run the comprehensive college selection analysis
result = college_selection_workflow.run(
student_profile,
no_use_clusterops=True,
)
print(result)

@ -0,0 +1,64 @@
"""
Todo
- You send structured data to the swarm through the users form they make
- then connect rag for every agent using llama index to remember all the students data
- structured outputs
"""
import os
from dotenv import load_dotenv
from swarm_models import OpenAIChat, OpenAIFunctionCaller
from pydantic import BaseModel
from typing import List
class CollegeLog(BaseModel):
college_name: str
college_description: str
college_admission_requirements: str
class CollegesRecommendation(BaseModel):
colleges: List[CollegeLog]
reasoning: str
load_dotenv()
# Get the API key from environment variable
api_key = os.getenv("GROQ_API_KEY")
# Initialize the model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
function_caller = OpenAIFunctionCaller(
system_prompt="""You are a college selection final decision maker. Your role is to:
- Balance all relevant factors and stakeholder input.
- Only return the output in the schema format.
""",
openai_api_key=os.getenv("OPENAI_API_KEY"),
base_model=CollegesRecommendation,
# parallel_tool_calls=True,
)
print(
function_caller.run(
"""
Student Profile: Kye Gomez
- GPA: 3.8
- SAT: 1450
- Interests: Computer Science, Robotics
- Location Preference: East Coast
- Extracurriculars: Robotics Club President, Math Team
- Budget: Need financial aid
- Preferred Environment: Medium-sized urban campus
"""
)
)

@ -0,0 +1,116 @@
from typing import Optional
from pathlib import Path
from loguru import logger
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
class LlamaIndexDB:
"""A class to manage document indexing and querying using LlamaIndex.
This class provides functionality to add documents from a directory and query the indexed documents.
Args:
data_dir (str): Directory containing documents to index. Defaults to "docs".
**kwargs: Additional arguments passed to SimpleDirectoryReader and VectorStoreIndex.
SimpleDirectoryReader kwargs:
- filename_as_id (bool): Use filenames as document IDs
- recursive (bool): Recursively read subdirectories
- required_exts (List[str]): Only read files with these extensions
- exclude_hidden (bool): Skip hidden files
VectorStoreIndex kwargs:
- service_context: Custom service context
- embed_model: Custom embedding model
- similarity_top_k (int): Number of similar docs to retrieve
- store_nodes_override (bool): Override node storage
"""
def __init__(self, data_dir: str = "docs", **kwargs) -> None:
"""Initialize the LlamaIndexDB with an empty index.
Args:
data_dir (str): Directory containing documents to index
**kwargs: Additional arguments for SimpleDirectoryReader and VectorStoreIndex
"""
self.data_dir = data_dir
self.index: Optional[VectorStoreIndex] = None
self.reader_kwargs = {
k: v
for k, v in kwargs.items()
if k
in SimpleDirectoryReader.__init__.__code__.co_varnames
}
self.index_kwargs = {
k: v
for k, v in kwargs.items()
if k not in self.reader_kwargs
}
logger.info("Initialized LlamaIndexDB")
data_path = Path(self.data_dir)
if not data_path.exists():
logger.error(f"Directory not found: {self.data_dir}")
raise FileNotFoundError(
f"Directory {self.data_dir} does not exist"
)
try:
documents = SimpleDirectoryReader(
self.data_dir, **self.reader_kwargs
).load_data()
self.index = VectorStoreIndex.from_documents(
documents, **self.index_kwargs
)
logger.success(
f"Successfully indexed documents from {self.data_dir}"
)
except Exception as e:
logger.error(f"Error indexing documents: {str(e)}")
raise
def query(self, query: str, **kwargs) -> str:
"""Query the indexed documents.
Args:
query (str): The query string to search for
**kwargs: Additional arguments passed to the query engine
- similarity_top_k (int): Number of similar documents to retrieve
- streaming (bool): Enable streaming response
- response_mode (str): Response synthesis mode
- max_tokens (int): Maximum tokens in response
Returns:
str: The response from the query engine
Raises:
ValueError: If no documents have been indexed yet
"""
if self.index is None:
logger.error("No documents have been indexed yet")
raise ValueError("Must add documents before querying")
try:
query_engine = self.index.as_query_engine(**kwargs)
response = query_engine.query(query)
print(response)
logger.info(f"Successfully queried: {query}")
return str(response)
except Exception as e:
logger.error(f"Error during query: {str(e)}")
raise
# # Example usage
# llama_index_db = LlamaIndexDB(
# data_dir="docs",
# filename_as_id=True,
# recursive=True,
# required_exts=[".txt", ".pdf", ".docx"],
# similarity_top_k=3
# )
# response = llama_index_db.query(
# "What is the medical history of patient 1?",
# streaming=True,
# response_mode="compact"
# )
# print(response)

@ -0,0 +1,237 @@
"""
Todo
- You send structured data to the swarm through the users form they make
- then connect rag for every agent using llama index to remember all the students data
- structured outputs
"""
import os
from dotenv import load_dotenv
from swarms import Agent, SequentialWorkflow
from swarm_models import OpenAIChat, OpenAIFunctionCaller
from pydantic import BaseModel
from typing import List
class CollegeLog(BaseModel):
college_name: str
college_description: str
college_admission_requirements: str
class CollegesRecommendation(BaseModel):
colleges: List[CollegeLog]
reasoning: str
load_dotenv()
# Get the API key from environment variable
api_key = os.getenv("GROQ_API_KEY")
# Initialize the model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
FINAL_AGENT_PROMPT = """
You are a college selection final decision maker. Your role is to:
1. Synthesize all previous analyses and discussions
2. Weigh competing factors and trade-offs
3. Create a final ranked list of recommended colleges
4. Provide clear rationale for each recommendation
5. Include specific action items for each selected school
6. Outline next steps in the application process
Focus on creating actionable, well-reasoned final recommendations that
balance all relevant factors and stakeholder input.
"""
function_caller = OpenAIFunctionCaller(
system_prompt=FINAL_AGENT_PROMPT,
openai_api_key=os.getenv("OPENAI_API_KEY"),
base_model=CollegesRecommendation,
parallel_tool_calls=True,
)
# Student Profile Analyzer Agent
profile_analyzer_agent = Agent(
agent_name="Student-Profile-Analyzer",
system_prompt="""You are an expert student profile analyzer. Your role is to:
1. Analyze academic performance, test scores, and extracurricular activities
2. Identify student's strengths, weaknesses, and unique qualities
3. Evaluate personal statements and essays
4. Assess leadership experiences and community involvement
5. Determine student's preferences for college environment, location, and programs
6. Create a comprehensive student profile summary
Always consider both quantitative metrics (GPA, test scores) and qualitative aspects
(personal growth, challenges overcome, unique perspectives).""",
llm=model,
max_loops=1,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="profile_analyzer_agent.json",
user_name="student",
context_length=200000,
output_type="string",
)
# College Research Agent
college_research_agent = Agent(
agent_name="College-Research-Specialist",
system_prompt="""You are a college research specialist. Your role is to:
1. Maintain updated knowledge of college admission requirements
2. Research academic programs, campus culture, and student life
3. Analyze admission statistics and trends
4. Evaluate college-specific opportunities and resources
5. Consider financial aid availability and scholarship opportunities
6. Track historical admission data and acceptance rates
Focus on providing accurate, comprehensive information about each institution
while considering both academic and cultural fit factors.""",
llm=model,
max_loops=1,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="college_research_agent.json",
user_name="researcher",
context_length=200000,
output_type="string",
)
# College Match Agent
college_match_agent = Agent(
agent_name="College-Match-Maker",
system_prompt="""You are a college matching specialist. Your role is to:
1. Compare student profiles with college requirements
2. Evaluate fit based on academic, social, and cultural factors
3. Consider geographic preferences and constraints
4. Assess financial fit and aid opportunities
5. Create tiered lists of reach, target, and safety schools
6. Explain the reasoning behind each match
Always provide a balanced list with realistic expectations while
considering both student preferences and admission probability.""",
llm=model,
max_loops=1,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="college_match_agent.json",
user_name="matcher",
context_length=200000,
output_type="string",
)
# Debate Moderator Agent
debate_moderator_agent = Agent(
agent_name="Debate-Moderator",
system_prompt="""You are a college selection debate moderator. Your role is to:
1. Facilitate discussions between different perspectives
2. Ensure all relevant factors are considered
3. Challenge assumptions and biases
4. Synthesize different viewpoints
5. Guide the group toward consensus
6. Document key points of agreement and disagreement
Maintain objectivity while ensuring all important factors are thoroughly discussed
and evaluated.""",
llm=model,
max_loops=1,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="debate_moderator_agent.json",
user_name="moderator",
context_length=200000,
output_type="string",
)
# Critique Agent
critique_agent = Agent(
agent_name="College-Selection-Critic",
system_prompt="""You are a college selection critic. Your role is to:
1. Evaluate the strength of college matches
2. Identify potential overlooked factors
3. Challenge assumptions in the selection process
4. Assess risks and potential drawbacks
5. Provide constructive feedback on selections
6. Suggest alternative options when appropriate
Focus on constructive criticism that helps improve the final college list
while maintaining realistic expectations.""",
llm=model,
max_loops=1,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="critique_agent.json",
user_name="critic",
context_length=200000,
output_type="string",
)
# Final Decision Agent
final_decision_agent = Agent(
agent_name="Final-Decision-Maker",
system_prompt="""
You are a college selection final decision maker. Your role is to:
1. Synthesize all previous analyses and discussions
2. Weigh competing factors and trade-offs
3. Create a final ranked list of recommended colleges
4. Provide clear rationale for each recommendation
5. Include specific action items for each selected school
6. Outline next steps in the application process
Focus on creating actionable, well-reasoned final recommendations that
balance all relevant factors and stakeholder input.
""",
llm=model,
max_loops=1,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="final_decision_agent.json",
user_name="decision_maker",
context_length=200000,
output_type="string",
)
# Initialize the Sequential Workflow
college_selection_workflow = SequentialWorkflow(
name="college-selection-swarm",
description="Comprehensive college selection and analysis system",
max_loops=1,
agents=[
profile_analyzer_agent,
college_research_agent,
college_match_agent,
debate_moderator_agent,
critique_agent,
final_decision_agent,
],
output_type="all",
)
# Example usage
if __name__ == "__main__":
# Example student profile input
student_profile = """
Student Profile:
- GPA: 3.8
- SAT: 1450
- Interests: Computer Science, Robotics
- Location Preference: East Coast
- Extracurriculars: Robotics Club President, Math Team
- Budget: Need financial aid
- Preferred Environment: Medium-sized urban campus
"""
# Run the comprehensive college selection analysis
result = college_selection_workflow.run(
student_profile,
no_use_clusterops=True,
)
print(result)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "6.2.0"
version = "6.2.8"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]
@ -79,6 +79,7 @@ clusterops = "*"
chromadb = "*"
reportlab = "*"
doc-master = "*"
rich = "*"
[tool.poetry.scripts]
swarms = "swarms.cli.main:main"

@ -12,6 +12,7 @@ loguru==0.7.2
pydantic==2.8.2
tenacity==8.5.0
Pillow==10.4.0
rich
psutil
sentry-sdk
python-dotenv

@ -112,6 +112,7 @@ router = SequentialWorkflow(
if __name__ == "__main__":
# Run a comprehensive private equity document analysis task
result = router.run(
"Where is the best place to find template term sheets for series A startups. Provide links and references"
"Where is the best place to find template term sheets for series A startups. Provide links and references",
img=None,
)
print(result)

@ -0,0 +1,143 @@
import os
from dotenv import load_dotenv
from swarms import Agent, SequentialWorkflow
from swarm_models import OpenAIChat
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize specialized agents
data_extractor_agent = Agent(
agent_name="Data-Extractor",
system_prompt="""You are a data extraction specialist. Your role is to:
1. Extract key information, data points, and metrics from documents
2. Identify and pull out important facts, figures, and statistics
3. Structure extracted data in a clear, organized format
4. Flag any inconsistencies or missing data
5. Ensure accuracy in data extraction while maintaining context""",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="data_extractor_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
summarizer_agent = Agent(
agent_name="Document-Summarizer",
system_prompt="""You are a document summarization expert. Your role is to:
1. Create concise, comprehensive summaries of documents
2. Highlight key points and main takeaways
3. Maintain the essential meaning while reducing length
4. Structure summaries in a logical, readable format
5. Identify and emphasize critical insights""",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="summarizer_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
financial_analyst_agent = Agent(
agent_name="Financial-Analyst",
system_prompt="""You are a financial analysis expert. Your role is to:
1. Analyze financial statements and metrics
2. Evaluate company valuations and financial projections
3. Assess financial risks and opportunities
4. Provide insights on financial performance and health
5. Make recommendations based on financial analysis""",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="financial_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
market_analyst_agent = Agent(
agent_name="Market-Analyst",
system_prompt="""You are a market analysis expert. Your role is to:
1. Analyze market trends and dynamics
2. Evaluate competitive landscape and market positioning
3. Identify market opportunities and threats
4. Assess market size and growth potential
5. Provide strategic market insights and recommendations""",
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="market_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
operational_analyst_agent = Agent(
agent_name="Operational-Analyst",
system_prompt="""You are an operational analysis expert. Your role is to:
1. Analyze business operations and processes
2. Evaluate operational efficiency and effectiveness
3. Identify operational risks and opportunities
4. Assess scalability and growth potential
5. Provide recommendations for operational improvements""",
llm=model,
max_loops=2,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="operational_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
# Initialize the SwarmRouter
router = SequentialWorkflow(
name="pe-document-analysis-swarm",
description="Analyze documents for private equity due diligence and investment decision-making",
max_loops=1,
agents=[
data_extractor_agent,
summarizer_agent,
financial_analyst_agent,
market_analyst_agent,
operational_analyst_agent,
],
output_type="all",
)
# Example usage
if __name__ == "__main__":
# Run a comprehensive private equity document analysis task
result = router.run(
"Where is the best place to find template term sheets for series A startups. Provide links and references",
no_use_clusterops=True,
)
print(result)

@ -173,10 +173,6 @@ class OnboardingProcess:
self.ask_input(
"Enter your email (or type 'quit' to exit): ", "email"
)
self.ask_input(
"Enter your Swarms API key (or type 'quit' to exit): Get this in your swarms dashboard: https://swarms.world/platform/api-keys ",
"swarms_api_key",
)
workspace = self.ask_input(
"Enter your WORKSPACE_DIR: This is where logs, errors, and agent configurations will be stored (or type 'quit' to exit). Remember to set this as an environment variable: https://docs.swarms.world/en/latest/swarms/install/quickstart/ || ",
"workspace_dir",

@ -53,6 +53,7 @@ from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
from swarms.utils.formatter import formatter
logger = initialize_logger(log_folder="agents")
@ -338,6 +339,7 @@ class Agent:
device_id: int = 0,
scheduled_run_date: Optional[datetime] = None,
do_not_use_cluster_ops: bool = True,
all_gpus: bool = False,
*args,
**kwargs,
):
@ -452,6 +454,7 @@ class Agent:
self.device_id = device_id
self.scheduled_run_date = scheduled_run_date
self.do_not_use_cluster_ops = do_not_use_cluster_ops
self.all_gpus = all_gpus
# Initialize the short term memory
self.short_memory = Conversation(
@ -685,11 +688,11 @@ class Agent:
try:
if hasattr(self.llm, "temperature"):
# Randomly change the temperature attribute of self.llm object
logger.info("Enabling Random Dyamic Temperature")
self.llm.temperature = random.uniform(0.0, 1.0)
logger.info(f"Temperature: {self.llm.temperature}")
else:
# Use a default temperature
self.llm.temperature = 0.7
self.llm.temperature = 0.5
except Exception as error:
print(
colored(
@ -759,6 +762,7 @@ class Agent:
task: Optional[str] = None,
img: Optional[str] = None,
is_last: bool = False,
print_task: bool = False,
*args,
**kwargs,
) -> Any:
@ -801,6 +805,15 @@ class Agent:
if self.long_term_memory is not None:
self.memory_query(task)
# Print the user's request
# Print the request
if print_task is True:
formatter.print_panel(
f"\n User: {task}",
f"Task Request for {self.agent_name}",
)
while (
self.max_loops == "auto"
or loop_count < self.max_loops
@ -847,9 +860,17 @@ class Agent:
# Print
if self.streaming_on is True:
self.stream_response(response)
# self.stream_response(response)
formatter.print_panel_token_by_token(
f"{self.agent_name}: {response}",
title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]",
)
else:
logger.info(f"Response: {response}")
# logger.info(f"Response: {response}")
formatter.print_panel(
f"{self.agent_name}: {response}",
f"Agent Name {self.agent_name} [Max Loops: {loop_count} ]",
)
# Check if response is a dictionary and has 'choices' key
if (
@ -1026,7 +1047,12 @@ class Agent:
elif self.return_step_meta is True:
return self.agent_output.model_dump_json(indent=4)
elif self.return_history is True:
return self.short_memory.get_str()
history = self.short_memory.get_str()
formatter.print_panel(
history, title=f"{self.agent_name} History"
)
return history
else:
raise ValueError(
f"Invalid output type: {self.output_type}"
@ -2358,3 +2384,26 @@ class Agent:
f"Unexpected error handling artifact: {str(e)}"
)
raise
def showcase_config(self):
# Convert all values in config_dict to concise string representations
config_dict = self.to_dict()
for key, value in config_dict.items():
if isinstance(value, list):
# Format list as a comma-separated string
config_dict[key] = ", ".join(
str(item) for item in value
)
elif isinstance(value, dict):
# Format dict as key-value pairs in a single string
config_dict[key] = ", ".join(
f"{k}: {v}" for k, v in value.items()
)
else:
# Ensure any non-iterable value is a string
config_dict[key] = str(value)
return formatter.print_table(
f"Agent: {self.agent_name} Configuration", config_dict
)

@ -8,7 +8,6 @@ from multiprocessing import cpu_count
import os
from swarms.structs.agent import Agent
from swarms.utils.calculate_func_metrics import profile_func
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
@ -63,7 +62,6 @@ async def run_agents_concurrently_async(
return results
@profile_func
def run_agents_concurrently(
agents: List[AgentType],
task: str,
@ -109,7 +107,6 @@ def run_agents_concurrently(
return results
@profile_func
def run_agents_concurrently_multiprocess(
agents: List[Agent], task: str, batch_size: int = cpu_count()
) -> List[Any]:
@ -139,7 +136,6 @@ def run_agents_concurrently_multiprocess(
return results
@profile_func
def run_agents_sequentially(
agents: List[AgentType], task: str
) -> List[Any]:
@ -156,7 +152,6 @@ def run_agents_sequentially(
return [run_single_agent(agent, task) for agent in agents]
@profile_func
def run_agents_with_different_tasks(
agent_task_pairs: List[tuple[AgentType, str]],
batch_size: int = None,
@ -233,7 +228,6 @@ async def run_agent_with_timeout(
return None
@profile_func
def run_agents_with_timeout(
agents: List[AgentType],
task: str,
@ -299,7 +293,6 @@ def get_system_metrics() -> ResourceMetrics:
)
@profile_func
def run_agents_with_resource_monitoring(
agents: List[AgentType],
task: str,
@ -335,7 +328,6 @@ def run_agents_with_resource_monitoring(
# Implementation details...
@profile_func
def _run_agents_with_tasks_concurrently(
agents: List[AgentType],
tasks: List[str] = [],
@ -400,8 +392,9 @@ def run_agents_with_tasks_concurrently(
batch_size: int = None,
max_workers: int = None,
device: str = "cpu",
device_id: int = 0,
device_id: int = 1,
all_cores: bool = True,
no_clusterops: bool = False,
) -> List[Any]:
"""
Executes a list of agents with their corresponding tasks concurrently on a specified device.
@ -420,16 +413,23 @@ def run_agents_with_tasks_concurrently(
Returns:
List[Any]: A list of outputs from each agent execution.
"""
return exec_callable_with_clusterops(
device,
device_id,
all_cores,
_run_agents_with_tasks_concurrently,
agents,
tasks,
batch_size,
max_workers,
)
# Make the first agent not use the ifrs
if no_clusterops:
return _run_agents_with_tasks_concurrently(
agents, tasks, batch_size, max_workers
)
else:
return exec_callable_with_clusterops(
device,
device_id,
all_cores,
_run_agents_with_tasks_concurrently,
agents,
tasks,
batch_size,
max_workers,
)
# # Example usage:

@ -176,7 +176,7 @@ class AgentRearrange(BaseSwarm):
# self.handle_input_docs()
# Show the agents whose in the swarm
self.showcase_agents()
# self.showcase_agents()
def showcase_agents(self):
# Get formatted agent info once
@ -380,6 +380,11 @@ class AgentRearrange(BaseSwarm):
)
else:
agent = self.agents[agent_name]
current_task = (
str(current_task)
if current_task
else ""
)
result = agent.run(
task=current_task,
img=img,
@ -387,6 +392,7 @@ class AgentRearrange(BaseSwarm):
*args,
**kwargs,
)
result = str(result)
results.append(result)
response_dict[agent_name] = result
self.output_schema.outputs.append(
@ -423,6 +429,11 @@ class AgentRearrange(BaseSwarm):
response_dict[agent_name] = current_task
else:
agent = self.agents[agent_name]
current_task = (
str(current_task)
if current_task
else ""
)
current_task = agent.run(
task=current_task,
img=img,
@ -430,6 +441,7 @@ class AgentRearrange(BaseSwarm):
*args,
**kwargs,
)
current_task = str(current_task)
response_dict[agent_name] = current_task
self.output_schema.outputs.append(
agent.agent_output
@ -470,9 +482,10 @@ class AgentRearrange(BaseSwarm):
task: str = None,
img: str = None,
device: str = "cpu",
device_id: int = 1,
device_id: int = 2,
all_cores: bool = True,
all_gpus: bool = False,
no_use_clusterops: bool = False,
*args,
**kwargs,
):
@ -486,23 +499,32 @@ class AgentRearrange(BaseSwarm):
device_id (int, optional): ID of specific device to use. Defaults to 1.
all_cores (bool, optional): Whether to use all CPU cores. Defaults to True.
all_gpus (bool, optional): Whether to use all available GPUs. Defaults to False.
no_use_clusterops (bool, optional): Whether to use clusterops. Defaults to False.
*args: Additional positional arguments passed to _run().
**kwargs: Additional keyword arguments passed to _run().
Returns:
The result from executing the task through the cluster operations wrapper.
"""
return exec_callable_with_clusterops(
device=device,
device_id=device_id,
all_cores=all_cores,
all_gpus=all_gpus,
func=self._run,
task=task,
img=img,
*args,
**kwargs,
)
if no_use_clusterops:
return self._run(
task=task,
img=img,
*args,
**kwargs,
)
else:
return exec_callable_with_clusterops(
device=device,
device_id=device_id,
all_cores=all_cores,
all_gpus=all_gpus,
func=self._run,
task=task,
img=img,
*args,
**kwargs,
)
def __call__(self, task: str, *args, **kwargs):
"""

@ -1,8 +1,7 @@
from typing import List
from typing import List, Optional
from swarms.structs.agent import Agent
from swarms.structs.rearrange import AgentRearrange, OutputType
from concurrent.futures import ThreadPoolExecutor, as_completed
from swarms.structs.agents_available import showcase_available_agents
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="sequential_workflow")
@ -59,9 +58,6 @@ class SequentialWorkflow:
**kwargs,
)
# Handle agent showcase
self.handle_agent_showcase()
def sequential_flow(self):
# Only create flow if agents exist
if self.agents:
@ -101,31 +97,17 @@ class SequentialWorkflow:
if self.max_loops == 0:
raise ValueError("max_loops cannot be 0")
if self.output_type not in OutputType:
raise ValueError(
"output_type must be 'all', 'final', 'list', 'dict', '.json', '.md', '.txt', '.yaml', or '.toml'"
)
logger.info("Checks completed your swarm is ready.")
def handle_agent_showcase(self):
# Get the showcase string once instead of regenerating for each agent
showcase_str = showcase_available_agents(
name=self.name,
description=self.description,
agents=self.agents,
)
# Append showcase string to each agent's existing system prompt
for agent in self.agents:
agent.system_prompt += showcase_str
def run(
self,
task: str,
img: Optional[str] = None,
device: str = "cpu",
all_cpus: bool = False,
auto_gpu: bool = False,
all_cores: bool = False,
all_gpus: bool = False,
device_id: int = 0,
no_use_clusterops: bool = False,
*args,
**kwargs,
) -> str:
@ -134,6 +116,12 @@ class SequentialWorkflow:
Args:
task (str): The task for the agents to execute.
device (str): The device to use for the agents to execute.
all_cores (bool): Whether to use all cores.
all_gpus (bool): Whether to use all gpus.
device_id (int): The device id to use for the agents to execute.
no_use_clusterops (bool): Whether to use clusterops.
Returns:
str: The final result after processing through all agents.
@ -144,14 +132,14 @@ class SequentialWorkflow:
"""
try:
logger.info(
f"Executing task with dynamic flow: {self.flow}"
)
return self.agent_rearrange.run(
task,
task=task,
img=img,
device=device,
all_cpus=all_cpus,
auto_gpu=auto_gpu,
all_cores=all_cores,
device_id=device_id,
all_gpus=all_gpus,
no_use_clusterops=no_use_clusterops,
*args,
**kwargs,
)
@ -186,9 +174,6 @@ class SequentialWorkflow:
)
try:
logger.info(
f"Executing batch of tasks with dynamic flow: {self.flow}"
)
return [self.agent_rearrange.run(task) for task in tasks]
except Exception as e:
logger.error(
@ -214,9 +199,6 @@ class SequentialWorkflow:
raise ValueError("Task must be a non-empty string")
try:
logger.info(
f"Executing task with dynamic flow asynchronously: {self.flow}"
)
return await self.agent_rearrange.run_async(task)
except Exception as e:
logger.error(
@ -246,9 +228,6 @@ class SequentialWorkflow:
)
try:
logger.info(
f"Executing batch of tasks with dynamic flow concurrently: {self.flow}"
)
with ThreadPoolExecutor() as executor:
results = [
executor.submit(self.agent_rearrange.run, task)

@ -0,0 +1,135 @@
import time
from typing import Any, Callable, Dict, List
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.table import Table
from rich.text import Text
class Formatter:
"""
A class for formatting and printing rich text to the console.
"""
def __init__(self):
"""
Initializes the Formatter with a Rich Console instance.
"""
self.console = Console()
def print_panel(
self, content: str, title: str = "", style: str = "bold blue"
) -> None:
"""
Prints a rich panel to the console with a random color.
Args:
content (str): The content of the panel.
title (str, optional): The title of the panel. Defaults to "".
style (str, optional): The style of the panel. Defaults to "bold blue".
"""
import random
colors = [
"red",
"green",
"blue",
"yellow",
"magenta",
"cyan",
"white",
]
random_color = random.choice(colors)
panel = Panel(
content, title=title, style=f"bold {random_color}"
)
self.console.print(panel)
def print_table(
self, title: str, data: Dict[str, List[str]]
) -> None:
"""
Prints a rich table to the console.
Args:
title (str): The title of the table.
data (Dict[str, List[str]]): A dictionary where keys are categories and values are lists of capabilities.
"""
table = Table(show_header=True, header_style="bold magenta")
table.add_column("Category", style="cyan")
table.add_column("Capabilities", style="green")
for category, items in data.items():
table.add_row(category, "\n".join(items))
self.console.print(f"\n🔥 {title}:", style="bold yellow")
self.console.print(table)
def print_progress(
self,
description: str,
task_fn: Callable,
*args: Any,
**kwargs: Any,
) -> Any:
"""
Prints a progress bar to the console and executes a task function.
Args:
description (str): The description of the task.
task_fn (Callable): The function to execute.
*args (Any): Arguments to pass to the task function.
**kwargs (Any): Keyword arguments to pass to the task function.
Returns:
Any: The result of the task function.
"""
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
) as progress:
task = progress.add_task(description, total=None)
result = task_fn(*args, **kwargs)
progress.update(task, completed=True)
return result
def print_panel_token_by_token(
self,
tokens: str,
title: str = "Output",
style: str = "bold cyan",
delay: float = 0.01,
by_word: bool = False,
) -> None:
"""
Prints a string in real-time, token by token (character or word) inside a Rich panel.
Args:
tokens (str): The string to display in real-time.
title (str): Title of the panel.
style (str): Style for the text inside the panel.
delay (float): Delay in seconds between displaying each token.
by_word (bool): If True, display by words; otherwise, display by characters.
"""
text = Text(style=style)
# Split tokens into characters or words
token_list = tokens.split() if by_word else tokens
with Live(
Panel(text, title=title, border_style=style),
console=self.console,
refresh_per_second=10,
) as live:
for token in token_list:
text.append(token + (" " if by_word else ""))
live.update(
Panel(text, title=title, border_style=style)
)
time.sleep(delay)
formatter = Formatter()

@ -15,10 +15,11 @@ logger = initialize_logger(log_folder="clusterops_wrapper")
def exec_callable_with_clusterops(
device: str = "cpu",
device_id: int = 0,
device_id: int = 1,
all_cores: bool = True,
all_gpus: bool = False,
func: callable = None,
enable_logging: bool = True,
*args,
**kwargs,
) -> Any:
@ -33,6 +34,7 @@ def exec_callable_with_clusterops(
all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True.
all_gpus (bool, optional): If True, uses all available GPUs. Defaults to False.
func (callable): The function to execute.
enable_logging (bool, optional): If True, enables logging. Defaults to True.
*args: Additional positional arguments to be passed to the execution method.
**kwargs: Additional keyword arguments to be passed to the execution method.
@ -47,35 +49,44 @@ def exec_callable_with_clusterops(
raise ValueError("A callable function must be provided")
try:
logger.info(f"Attempting to run on device: {device}")
if enable_logging:
logger.info(f"Attempting to run on device: {device}")
device = device.lower()
if device == "cpu":
logger.info("Device set to CPU")
if enable_logging:
logger.info("Device set to CPU")
if all_cores:
logger.info("Using all CPU cores")
if enable_logging:
logger.info("Using all CPU cores")
return execute_with_all_cpu_cores(
func, *args, **kwargs
)
if device_id is not None:
logger.info(f"Using specific CPU core: {device_id}")
if enable_logging:
logger.info(
f"Using specific CPU core: {device_id}"
)
return execute_on_cpu(
device_id, func, *args, **kwargs
)
elif device == "gpu":
logger.info("Device set to GPU")
if enable_logging:
logger.info("Device set to GPU")
if all_gpus:
logger.info("Using all available GPUs")
if enable_logging:
logger.info("Using all available GPUs")
gpus = [int(gpu) for gpu in list_available_gpus()]
return execute_on_multiple_gpus(
gpus, func, *args, **kwargs
)
logger.info(f"Using GPU device ID: {device_id}")
if enable_logging:
logger.info(f"Using GPU device ID: {device_id}")
return execute_on_gpu(device_id, func, *args, **kwargs)
else:
@ -84,10 +95,12 @@ def exec_callable_with_clusterops(
)
except ValueError as e:
logger.error(
f"Invalid device or configuration specified: {e}"
)
if enable_logging:
logger.error(
f"Invalid device or configuration specified: {e}"
)
raise
except Exception as e:
logger.error(f"An error occurred during execution: {e}")
if enable_logging:
logger.error(f"An error occurred during execution: {e}")
raise

Loading…
Cancel
Save