cort -> swarms.agents

pull/846/head^2
Kye Gomez 5 months ago
parent ea9ecd56da
commit 97e25637e9

@ -18,11 +18,8 @@ if __name__ == "__main__":
# Initialize the workflow with the list of agents # Initialize the workflow with the list of agents
workflow = ConcurrentWorkflow( workflow = ConcurrentWorkflow(
agents=agents, agents=agents,
metadata_output_path="agent_metadata_4.json",
output_type="list", output_type="list",
show_progress=False, max_loops=1,
max_loops=3,
interactive=True,
) )
# Define the task for all agents # Define the task for all agents

@ -1,643 +0,0 @@
"""
CEO -> Finds department leader
Department leader -> Finds employees
Employees -> Do the work
Todo
- Create schemas that enable the ceo to find the department leader or leaders
- CEO then distributes orders to department leaders or just one leader
- Department leader then distributes orders to employees
- Employees can choose to do the work or delegate to another employee or work together
- When the employees are done, they report back to the department leader
- Department leader then reports back to the ceo
- CEO then reports back to the user
Logic
- dynamically setup conversations for each department -- Feed context to each agent in the department
- Feed context to each agent in the department
"""
from typing import Callable, List, Union
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.structs.ma_utils import list_all_agents
from swarms.utils.str_to_dict import str_to_dict
from swarms.utils.any_to_str import any_to_str
class Department(BaseModel):
name: str = Field(description="The name of the department")
description: str = Field(
description="A description of the department"
)
employees: List[Union[Agent, Callable]] = Field(
description="A list of employees in the department"
)
leader_name: str = Field(
description="The name of the leader of the department"
)
class Config:
arbitrary_types_allowed = True
CEO_SCHEMA = {
"name": "delegate_task_to_department",
"description": "CEO function to analyze and delegate tasks to appropriate department leaders",
"parameters": {
"type": "object",
"properties": {
"thought": {
"type": "string",
"description": "Reasoning about the task, its requirements, and potential approaches",
},
"plan": {
"type": "string",
"description": "Structured plan for how to accomplish the task across departments",
},
"tasks": {
"type": "object",
"properties": {
"task_description": {"type": "string"},
"selected_departments": {
"type": "array",
"items": {"type": "string"},
"description": "List of department names that should handle this task",
},
"selected_leaders": {
"type": "array",
"items": {"type": "string"},
"description": "List of department leaders to assign the task to",
},
"success_criteria": {"type": "string"},
},
"required": [
"task_description",
"selected_departments",
"selected_leaders",
],
},
},
"required": ["thought", "plan", "tasks"],
},
}
DEPARTMENT_LEADER_SCHEMA = {
"name": "manage_department_task",
"description": "Department leader function to break down and assign tasks to employees",
"parameters": {
"type": "object",
"properties": {
"task_management": {
"type": "object",
"properties": {
"original_task": {"type": "string"},
"subtasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"subtask_id": {"type": "string"},
"description": {"type": "string"},
"assigned_employees": {
"type": "array",
"items": {"type": "string"},
},
"estimated_duration": {
"type": "string"
},
"dependencies": {
"type": "array",
"items": {"type": "string"},
},
},
},
},
"progress_tracking": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": [
"not_started",
"in_progress",
"completed",
],
},
"completion_percentage": {
"type": "number"
},
"blockers": {
"type": "array",
"items": {"type": "string"},
},
},
},
},
"required": ["original_task", "subtasks"],
}
},
"required": ["task_management"],
},
}
EMPLOYEE_SCHEMA = {
"name": "handle_assigned_task",
"description": "Employee function to process and execute assigned tasks",
"parameters": {
"type": "object",
"properties": {
"thought": {
"type": "string",
"description": "Reasoning about the task, its requirements, and potential approaches",
},
"plan": {
"type": "string",
"description": "Structured plan for how to accomplish the task across departments",
},
"task_execution": {
"type": "object",
"properties": {
"subtask_id": {"type": "string"},
"action_taken": {
"type": "string",
"enum": [
"execute",
"delegate",
"collaborate",
],
},
"execution_details": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": [
"in_progress",
"completed",
"blocked",
],
},
"work_log": {"type": "string"},
"collaboration_partners": {
"type": "array",
"items": {"type": "string"},
},
"delegate_to": {"type": "string"},
"results": {"type": "string"},
"issues_encountered": {
"type": "array",
"items": {"type": "string"},
},
},
},
},
"required": [
"thought",
"plan",
"subtask_id",
"action_taken",
"execution_details",
],
},
},
"required": ["task_execution"],
},
}
# Status report schemas for the feedback loop
EMPLOYEE_REPORT_SCHEMA = {
"name": "submit_task_report",
"description": "Employee function to report task completion status to department leader",
"parameters": {
"type": "object",
"properties": {
"task_report": {
"type": "object",
"properties": {
"subtask_id": {"type": "string"},
"completion_status": {
"type": "string",
"enum": ["completed", "partial", "blocked"],
},
"work_summary": {"type": "string"},
"time_spent": {"type": "string"},
"challenges": {
"type": "array",
"items": {"type": "string"},
},
"next_steps": {"type": "string"},
},
"required": [
"subtask_id",
"completion_status",
"work_summary",
],
}
},
"required": ["task_report"],
},
}
DEPARTMENT_REPORT_SCHEMA = {
"name": "submit_department_report",
"description": "Department leader function to report department progress to CEO",
"parameters": {
"type": "object",
"properties": {
"department_report": {
"type": "object",
"properties": {
"department_name": {"type": "string"},
"task_summary": {"type": "string"},
"overall_status": {
"type": "string",
"enum": ["on_track", "at_risk", "completed"],
},
"completion_percentage": {"type": "number"},
"key_achievements": {
"type": "array",
"items": {"type": "string"},
},
"blockers": {
"type": "array",
"items": {"type": "string"},
},
"resource_needs": {
"type": "array",
"items": {"type": "string"},
},
"next_milestones": {
"type": "array",
"items": {"type": "string"},
},
},
"required": [
"department_name",
"task_summary",
"overall_status",
],
}
},
"required": ["department_report"],
},
}
CEO_FINAL_REPORT_SCHEMA = {
"name": "generate_final_report",
"description": "CEO function to compile final report for the user",
"parameters": {
"type": "object",
"properties": {
"final_report": {
"type": "object",
"properties": {
"task_overview": {"type": "string"},
"overall_status": {
"type": "string",
"enum": ["successful", "partial", "failed"],
},
"department_summaries": {
"type": "array",
"items": {
"type": "object",
"properties": {
"department": {"type": "string"},
"contribution": {"type": "string"},
"performance": {"type": "string"},
},
},
},
"final_results": {"type": "string"},
"recommendations": {
"type": "array",
"items": {"type": "string"},
},
"next_steps": {"type": "string"},
},
"required": [
"task_overview",
"overall_status",
"final_results",
],
}
},
"required": ["final_report"],
},
}
# # Example output schemas
# CEO_EXAMPLE_OUTPUT = {
# "thought": "This task requires coordination between the engineering and design departments to create a new feature. The engineering team will handle the backend implementation while design focuses on the user interface.",
# "plan": "1. Assign backend development to engineering department\n2. Assign UI/UX design to design department\n3. Set up regular sync meetings between departments\n4. Establish clear success criteria",
# "tasks": {
# "task_description": "Develop a new user authentication system with social login integration",
# "selected_departments": ["engineering", "design"],
# "selected_leaders": ["engineering_lead", "design_lead"],
# "success_criteria": "1. Social login working with 3 major providers\n2. UI/UX approved by design team\n3. Security audit passed\n4. Performance metrics met"
# }
# }
# DEPARTMENT_LEADER_EXAMPLE_OUTPUT = {
# "task_management": {
# "original_task": "Develop a new user authentication system with social login integration",
# "subtasks": [
# {
# "subtask_id": "ENG-001",
# "description": "Implement OAuth2 integration for Google",
# "assigned_employees": ["dev1", "dev2"],
# "estimated_duration": "3 days",
# "dependencies": ["DES-001"]
# },
# {
# "subtask_id": "ENG-002",
# "description": "Implement OAuth2 integration for Facebook",
# "assigned_employees": ["dev3"],
# "estimated_duration": "2 days",
# "dependencies": ["DES-001"]
# }
# ],
# "progress_tracking": {
# "status": "in_progress",
# "completion_percentage": 0.3,
# "blockers": ["Waiting for design team to provide UI mockups"]
# }
# }
# }
# EMPLOYEE_EXAMPLE_OUTPUT = {
# "thought": "The Google OAuth2 integration requires careful handling of token management and user data synchronization",
# "plan": "1. Set up Google OAuth2 credentials\n2. Implement token refresh mechanism\n3. Create user data sync pipeline\n4. Add error handling and logging",
# "task_execution": {
# "subtask_id": "ENG-001",
# "action_taken": "execute",
# "execution_details": {
# "status": "in_progress",
# "work_log": "Completed OAuth2 credential setup and initial token handling implementation",
# "collaboration_partners": ["dev2"],
# "delegate_to": None,
# "results": "Successfully implemented basic OAuth2 flow",
# "issues_encountered": ["Need to handle token refresh edge cases"]
# }
# }
# }
# EMPLOYEE_REPORT_EXAMPLE = {
# "task_report": {
# "subtask_id": "ENG-001",
# "completion_status": "partial",
# "work_summary": "Completed initial OAuth2 implementation, working on token refresh mechanism",
# "time_spent": "2 days",
# "challenges": ["Token refresh edge cases", "Rate limiting considerations"],
# "next_steps": "Implement token refresh mechanism and add rate limiting protection"
# }
# }
# DEPARTMENT_REPORT_EXAMPLE = {
# "department_report": {
# "department_name": "Engineering",
# "task_summary": "Making good progress on OAuth2 implementation, but waiting on design team for UI components",
# "overall_status": "on_track",
# "completion_percentage": 0.4,
# "key_achievements": [
# "Completed Google OAuth2 basic flow",
# "Set up secure token storage"
# ],
# "blockers": ["Waiting for UI mockups from design team"],
# "resource_needs": ["Additional QA resources for testing"],
# "next_milestones": [
# "Complete Facebook OAuth2 integration",
# "Implement token refresh mechanism"
# ]
# }
# }
# CEO_FINAL_REPORT_EXAMPLE = {
# "final_report": {
# "task_overview": "Successfully implemented new authentication system with social login capabilities",
# "overall_status": "successful",
# "department_summaries": [
# {
# "department": "Engineering",
# "contribution": "Implemented secure OAuth2 integrations and token management",
# "performance": "Excellent - completed all technical requirements"
# },
# {
# "department": "Design",
# "contribution": "Created intuitive UI/UX for authentication flows",
# "performance": "Good - delivered all required designs on time"
# }
# ],
# "final_results": "New authentication system is live and processing 1000+ logins per day",
# "recommendations": [
# "Add more social login providers",
# "Implement biometric authentication",
# "Add two-factor authentication"
# ],
# "next_steps": "Monitor system performance and gather user feedback for improvements"
# }
# }
class AutoCorp:
def __init__(
self,
name: str = "AutoCorp",
description: str = "A company that uses agents to automate tasks",
departments: List[Department] = [],
ceo: Agent = None,
):
self.name = name
self.description = description
self.departments = departments
self.ceo = ceo
self.conversation = Conversation()
# Check if the CEO and departments are set
self.reliability_check()
# Add departments to conversation
self.add_departments_to_conversation()
# Initialize the CEO agent
self.initialize_ceo_agent()
# Initialize the department leaders
self.setup_department_leaders()
# Initialize the department employees
self.department_employees_initialize()
def initialize_ceo_agent(self):
self.ceo.tools_list_dictionary = [
CEO_SCHEMA,
CEO_FINAL_REPORT_SCHEMA,
]
def setup_department_leaders(self):
self.department_leader_initialize()
self.initialize_department_leaders()
def department_leader_initialize(self):
"""Initialize each department leader with their department's context."""
for department in self.departments:
# Create a context dictionary for the department
department_context = {
"name": department.name,
"description": department.description,
"employees": list_all_agents(
department.employees,
self.conversation,
department.name,
False,
),
}
# Convert the context to a string
context_str = any_to_str(department_context)
# TODO: Add the department leader's tools and context
department.leader.system_prompt += f"""
You are the leader of the {department.name} department.
Department Context:
{context_str}
Your role is to:
1. Break down tasks into subtasks
2. Assign subtasks to appropriate employees
3. Track progress and manage blockers
4. Report back to the CEO
Use the provided tools to manage your department effectively.
"""
def department_employees_initialize(self):
"""Initialize each department leader with their department's context."""
for department in self.departments:
# Create a context dictionary for the department
department_context = {
"name": department.name,
"description": department.description,
"employees": list_all_agents(
department.employees,
self.conversation,
department.name,
False,
),
"leader": department.leader_name,
}
print(department_context)
# Convert the context to a string
context_str = any_to_str(department_context)
# Set the department leader's tools and context
department.employees.system_prompt += f"""
You are an employee of the {department.name} department.
Department Context:
{context_str}
Your role is to:
1. Break down tasks into subtasks
2. Assign subtasks to appropriate employees
3. Track progress and manage blockers
4. Report back to the CEO
Use the provided tools to manage your department effectively.
"""
def initialize_department_leaders(self):
# Use list comprehension for faster initialization
[
setattr(
dept.leader,
"tools_list_dictionary",
[DEPARTMENT_LEADER_SCHEMA],
)
for dept in self.departments
]
def reliability_check(self):
if self.ceo is None:
raise ValueError("CEO is not set")
if self.departments is None:
raise ValueError("No departments are set")
if len(self.departments) == 0:
raise ValueError("No departments are set")
def add_departments_to_conversation(self):
# Batch process departments using list comprehension
messages = [
{
"role": "System",
"content": f"Team: {dept.name}\nDescription: {dept.description}\nLeader: {dept.leader_name}\nAgents: {list_all_agents(dept.employees, self.conversation, dept.name, False)}",
}
for dept in self.departments
]
self.conversation.batch_add(messages)
# def add_department(self, department: Department):
# self.departments.append(department)
# def add_employee(self, employee: Union[Agent, Callable]):
# self.departments[-1].employees.append(employee)
# def add_ceo(self, ceo: Agent):
# self.ceo = ceo
# def add_employee_to_department(
# self, employee: Union[Agent, Callable], department: Department
# ):
# department.employees.append(employee)
# def add_leader_to_department(
# self, leader: Agent, department: Department
# ):
# department.leader = leader
# def add_department_to_auto_corp(self, department: Department):
# self.departments.append(department)
# def add_ceo_to_auto_corp(self, ceo: Agent):
# self.ceo = ceo
# def add_employee_to_ceo(self, employee: Union[Agent, Callable]):
# self.ceo.employees.append(employee)
def run(self, task: str):
self.ceo_to_department_leaders(task)
# Then the department leaders to employees
def ceo_to_department_leaders(self, task: str):
orders = self.ceo.run(
f"History: {self.conversation.get_str()}\n Your Current Task: {task}"
)
orders = str_to_dict(orders)
for department in orders["tasks"]["selected_departments"]:
department_leader = self.departments[department].leader
# Get the department leader to break down the task
outputs = department_leader.run(
orders["tasks"]["selected_leaders"]
)
# Add the department leader's response to the conversation
self.conversation.add(
role=f"{department_leader.name} from {department}",
content=outputs,
)

@ -1,291 +0,0 @@
import concurrent.futures
from typing import Dict, Optional
import secrets
import string
import uuid
from dotenv import load_dotenv
from swarms import Agent
import replicate
from swarms.utils.str_to_dict import str_to_dict
load_dotenv()
def generate_key(prefix: str = "run") -> str:
"""
Generates an API key similar to OpenAI's format (sk-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX).
Args:
prefix (str): The prefix for the API key. Defaults to "sk".
Returns:
str: An API key string in format: prefix-<48 random characters>
"""
# Create random string of letters and numbers
alphabet = string.ascii_letters + string.digits
random_part = "".join(secrets.choice(alphabet) for _ in range(28))
return f"{prefix}-{random_part}"
def _generate_media(
prompt: str = None, modalities: list = None
) -> Dict[str, str]:
"""
Generate media content (images or videos) based on text prompts using AI models.
Args:
prompt (str): Text description of the content to be generated
modalities (list): List of media types to generate (e.g., ["image", "video"])
Returns:
Dict[str, str]: Dictionary containing file paths of generated media
"""
if not prompt or not modalities:
raise ValueError("Prompt and modalities must be provided")
input = {"prompt": prompt}
results = {}
def _generate_image(input: Dict) -> str:
"""Generate an image and return the file path."""
output = replicate.run(
"black-forest-labs/flux-dev", input=input
)
file_paths = []
for index, item in enumerate(output):
unique_id = str(uuid.uuid4())
artifact = item.read()
file_path = f"output_{unique_id}_{index}.webp"
with open(file_path, "wb") as file:
file.write(artifact)
file_paths.append(file_path)
return file_paths
def _generate_video(input: Dict) -> str:
"""Generate a video and return the file path."""
output = replicate.run("luma/ray", input=input)
unique_id = str(uuid.uuid4())
artifact = output.read()
file_path = f"output_{unique_id}.mp4"
with open(file_path, "wb") as file:
file.write(artifact)
return file_path
for modality in modalities:
if modality == "image":
results["images"] = _generate_image(input)
elif modality == "video":
results["video"] = _generate_video(input)
else:
raise ValueError(f"Unsupported modality: {modality}")
print(results)
return results
def generate_media(
modalities: list,
prompt: Optional[str] = None,
count: int = 1,
) -> Dict:
with concurrent.futures.ThreadPoolExecutor(
max_workers=count
) as executor:
# Create list of identical tasks to run concurrently
futures = [
executor.submit(
_generate_media,
prompt=prompt, # Fix: Pass as keyword arguments
modalities=modalities,
)
for _ in range(count)
]
# Wait for all tasks to complete and collect results
results = [
future.result()
for future in concurrent.futures.as_completed(futures)
]
return {"results": results}
tools = [
{
"type": "function",
"function": {
"name": "generate_media",
"description": "Generate different types of media content (image, video, or music) based on text prompts using AI models.",
"parameters": {
"type": "object",
"properties": {
"modality": {
"type": "array",
"items": {
"type": "string",
"enum": ["image", "video", "music"],
},
"description": "The type of media content to generate",
},
"prompt": {
"type": "string",
"description": "Text description of the content to be generated. Specialize it for the modality at hand. For example, if you are generating an image, the prompt should be a description of the image you want to see. If you are generating a video, the prompt should be a description of the video you want to see. If you are generating music, the prompt should be a description of the music you want to hear.",
},
"count": {
"type": "integer",
"description": "Number of outputs to generate (1-4)",
},
},
"required": [
"modality",
"prompt",
"count",
],
},
},
}
]
MEDIA_GENERATION_SYSTEM_PROMPT = """
You are an expert AI Media Generation Assistant, specialized in crafting precise and effective prompts for generating images, videos, and music. Your role is to help users create high-quality media content by understanding their requests and translating them into optimal prompts.
GENERAL GUIDELINES:
- Always analyze the user's request carefully to determine the appropriate modality (image, video, or music)
- Maintain a balanced level of detail in prompts - specific enough to capture the desired outcome but not overly verbose
- Consider the technical limitations and capabilities of AI generation systems
- When unclear, ask for clarification about specific details or preferences
MODALITY-SPECIFIC GUIDELINES:
1. IMAGE GENERATION:
- Structure prompts with primary subject first, followed by style, mood, and technical specifications
- Include relevant art styles when specified (e.g., "digital art", "oil painting", "watercolor", "photorealistic")
- Consider composition elements (foreground, background, lighting, perspective)
- Use specific adjectives for clarity (instead of "beautiful", specify "vibrant", "ethereal", "gritty", etc.)
Example image prompts:
- "A serene Japanese garden at sunset, with cherry blossoms falling, painted in traditional ukiyo-e style, soft pastel colors"
- "Cyberpunk cityscape at night, neon lights reflecting in rain puddles, hyper-realistic digital art style"
2. VIDEO GENERATION:
- Describe the sequence of events clearly
- Specify camera movements if relevant (pan, zoom, tracking shot)
- Include timing and transitions when necessary
- Focus on dynamic elements and motion
Example video prompts:
- "Timelapse of a flower blooming in a garden, close-up shot, soft natural lighting, 10-second duration"
- "Drone shot flying through autumn forest, camera slowly rising above the canopy, revealing mountains in the distance"
3. MUSIC GENERATION:
- Specify genre, tempo, and mood
- Mention key instruments or sounds
- Include emotional qualities and intensity
- Reference similar artists or styles if relevant
Example music prompts:
- "Calm ambient electronic music with soft synthesizer pads, gentle piano melodies, 80 BPM, suitable for meditation"
- "Upbeat jazz fusion track with prominent bass line, dynamic drums, and horn section, inspired by Weather Report"
COUNT HANDLING:
- When multiple outputs are requested (1-4), maintain consistency while introducing subtle variations
- For images: Vary composition or perspective while maintaining style
- For videos: Adjust camera angles or timing while keeping the core concept
- For music: Modify instrument arrangements or tempo while preserving the genre and mood
PROMPT OPTIMIZATION PROCESS:
1. Identify core requirements from user input
2. Determine appropriate modality
3. Add necessary style and technical specifications
4. Adjust detail level based on complexity
5. Consider count and create variations if needed
EXAMPLES OF HANDLING USER REQUESTS:
User: "I want a fantasy landscape"
Assistant response: {
"modality": "image",
"prompt": "Majestic fantasy landscape with floating islands, crystal waterfalls, and ancient magical ruins, ethereal lighting, digital art style with rich colors",
"count": 1
}
User: "Create 3 variations of a peaceful nature scene"
Assistant response: {
"modality": "image",
"prompt": "Tranquil forest clearing with morning mist, sunbeams filtering through ancient trees, photorealistic style with soft natural lighting",
"count": 1
}
IMPORTANT CONSIDERATIONS:
- Avoid harmful, unethical, or inappropriate content
- Respect copyright and intellectual property guidelines
- Maintain consistency with brand guidelines when specified
- Consider technical limitations of current AI generation systems
"""
# Initialize the agent with the new system prompt
agent = Agent(
agent_name="Media-Generation-Agent",
agent_description="AI Media Generation Assistant",
system_prompt=MEDIA_GENERATION_SYSTEM_PROMPT,
max_loops=1,
tools_list_dictionary=tools,
output_type="final",
)
def create_agent(task: str):
output = str_to_dict(agent.run(task))
print(output)
print(type(output))
prompt = output["prompt"]
count = output["count"]
modalities = output["modality"]
output = generate_media(
modalities=modalities,
prompt=prompt,
count=count,
)
run_id = generate_key()
total_cost = 0
for modality in modalities:
if modality == "image":
total_cost += 0.1
elif modality == "video":
total_cost += 1
result = {
"id": run_id,
"success": True,
"prompt": prompt,
"count": count,
"modality": modalities,
"total_cost": total_cost,
}
return result
if __name__ == "__main__":
task = "Create 3 super kawaii variations of a magical Chinese mountain garden scene in anime style! 🌸✨ Include adorable elements like: cute koi fish swimming in crystal ponds, fluffy clouds floating around misty peaks, tiny pagodas with twinkling lights, and playful pandas hiding in bamboo groves. Make it extra magical with sparkles and soft pastel colors! Create both a video and an image for each variation. Just 1."
output = create_agent(task)
print("✨ Yay! Here's your super cute creation! ✨")
print(output)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "7.7.7" version = "7.7.8"
description = "Swarms - TGSC" description = "Swarms - TGSC"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]

@ -0,0 +1,173 @@
from swarms import Agent
from typing import List
# System prompt for REACT agent
REACT_AGENT_PROMPT = """
You are a REACT (Reason, Act, Observe) agent designed to solve tasks through an iterative process of reasoning and action. You maintain memory of previous steps to build upon past actions and observations.
Your process follows these key components:
1. MEMORY: Review and utilize previous steps
- Access and analyze previous observations
- Build upon past thoughts and plans
- Learn from previous actions
- Use historical context to make better decisions
2. OBSERVE: Analyze current state
- Consider both new information and memory
- Identify relevant patterns from past steps
- Note any changes or progress made
- Evaluate success of previous actions
3. THINK: Process and reason
- Combine new observations with historical knowledge
- Consider how past steps influence current decisions
- Identify patterns and learning opportunities
- Plan improvements based on previous outcomes
4. PLAN: Develop next steps
- Create strategies that build on previous success
- Avoid repeating unsuccessful approaches
- Consider long-term goals and progress
- Maintain consistency with previous actions
5. ACT: Execute with context
- Implement actions that progress from previous steps
- Build upon successful past actions
- Adapt based on learned experiences
- Maintain continuity in approach
For each step, you should:
- Reference relevant previous steps
- Show how current decisions relate to past actions
- Demonstrate learning and adaptation
- Maintain coherent progression toward the goal
Your responses should be structured, logical, and show clear reasoning that builds upon previous steps."""
# Schema for REACT agent responses
react_agent_schema = {
"type": "function",
"function": {
"name": "generate_react_response",
"description": "Generates a structured REACT agent response with memory of previous steps",
"parameters": {
"type": "object",
"properties": {
"memory_reflection": {
"type": "string",
"description": "Analysis of previous steps and their influence on current thinking",
},
"observation": {
"type": "string",
"description": "Current state observation incorporating both new information and historical context",
},
"thought": {
"type": "string",
"description": "Reasoning that builds upon previous steps and current observation",
},
"plan": {
"type": "string",
"description": "Structured plan that shows progression from previous actions",
},
"action": {
"type": "string",
"description": "Specific action that builds upon previous steps and advances toward the goal",
},
},
"required": [
"memory_reflection",
"observation",
"thought",
"plan",
"action",
],
},
},
}
class ReactAgent:
def __init__(
self,
name: str = "react-agent-o1",
description: str = "A react agent that uses o1 preview to solve tasks",
model_name: str = "openai/gpt-4o",
max_loops: int = 1,
):
self.name = name
self.description = description
self.model_name = model_name
self.max_loops = max_loops
self.agent = Agent(
agent_name=self.name,
agent_description=self.description,
model_name=self.model_name,
max_loops=1,
tools_list_dictionary=[react_agent_schema],
output_type="final",
)
# Initialize memory for storing steps
self.memory: List[str] = []
def step(self, task: str) -> str:
"""Execute a single step of the REACT process.
Args:
task: The task description or current state
Returns:
String response from the agent
"""
response = self.agent.run(task)
print(response)
return response
def run(self, task: str, *args, **kwargs) -> List[str]:
"""Run the REACT agent for multiple steps with memory.
Args:
task: The initial task description
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
List of all steps taken as strings
"""
# Reset memory at the start of a new run
self.memory = []
current_task = task
for i in range(self.max_loops):
print(f"\nExecuting step {i+1}/{self.max_loops}")
step_result = self.step(current_task)
print(step_result)
# Store step in memory
self.memory.append(step_result)
# Update task with previous response and memory context
memory_context = (
"\n\nMemory of previous steps:\n"
+ "\n".join(
f"Step {j+1}:\n{step}"
for j, step in enumerate(self.memory)
)
)
current_task = f"Previous response:\n{step_result}\n{memory_context}\n\nContinue with the original task: {task}"
return self.memory
# if __name__ == "__main__":
# agent = ReactAgent(
# max_loops=1
# ) # Increased max_loops to see the iteration
# result = agent.run(
# "Write a short story about a robot that can fly."
# )
# print(result)

@ -562,7 +562,7 @@ class Agent:
if self.react_on is True: if self.react_on is True:
self.system_prompt += REACT_SYS_PROMPT self.system_prompt += REACT_SYS_PROMPT
if self.max_loops > 1: if self.max_loops >= 2:
self.system_prompt += generate_reasoning_prompt( self.system_prompt += generate_reasoning_prompt(
self.max_loops self.max_loops
) )
@ -1044,14 +1044,14 @@ class Agent:
): ):
loop_count += 1 loop_count += 1
if self.max_loops > 1: if self.max_loops >= 2:
self.short_memory.add( self.short_memory.add(
role=self.agent_name, role=self.agent_name,
content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}", content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}",
) )
# If it is the final loop, then add the final loop message # If it is the final loop, then add the final loop message
if loop_count == self.max_loops: if loop_count >= 2 and loop_count == self.max_loops:
self.short_memory.add( self.short_memory.add(
role=self.agent_name, role=self.agent_name,
content=f"🎉 Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.", content=f"🎉 Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.",

@ -4,8 +4,6 @@ from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache from functools import lru_cache
from typing import Any, Callable, Dict, List, Optional, Union from typing import Any, Callable, Dict, List, Optional, Union
from tqdm import tqdm
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm from swarms.structs.base_swarm import BaseSwarm
from swarms.structs.conversation import Conversation from swarms.structs.conversation import Conversation
@ -22,9 +20,7 @@ class ConcurrentWorkflow(BaseSwarm):
""" """
Represents a concurrent workflow that executes multiple agents concurrently in a production-grade manner. Represents a concurrent workflow that executes multiple agents concurrently in a production-grade manner.
Features include: Features include:
- Interactive model support
- Caching for repeated prompts - Caching for repeated prompts
- Optional progress tracking
- Enhanced error handling and retries - Enhanced error handling and retries
- Input validation - Input validation
@ -39,11 +35,9 @@ class ConcurrentWorkflow(BaseSwarm):
return_str_on (bool): Flag indicating whether to return the output as a string. Defaults to False. return_str_on (bool): Flag indicating whether to return the output as a string. Defaults to False.
auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. Defaults to False. auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. Defaults to False.
return_entire_history (bool): Flag indicating whether to return the entire conversation history. Defaults to False. return_entire_history (bool): Flag indicating whether to return the entire conversation history. Defaults to False.
interactive (bool): Flag indicating whether to enable interactive mode. Defaults to False.
cache_size (int): The size of the cache. Defaults to 100. cache_size (int): The size of the cache. Defaults to 100.
max_retries (int): The maximum number of retry attempts. Defaults to 3. max_retries (int): The maximum number of retry attempts. Defaults to 3.
retry_delay (float): The delay between retry attempts in seconds. Defaults to 1.0. retry_delay (float): The delay between retry attempts in seconds. Defaults to 1.0.
show_progress (bool): Flag indicating whether to show progress. Defaults to False.
Raises: Raises:
ValueError: If the list of agents is empty or if the description is empty. ValueError: If the list of agents is empty or if the description is empty.
@ -59,13 +53,10 @@ class ConcurrentWorkflow(BaseSwarm):
return_str_on (bool): Flag indicating whether to return the output as a string. return_str_on (bool): Flag indicating whether to return the output as a string.
auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents. auto_generate_prompts (bool): Flag indicating whether to auto-generate prompts for agents.
return_entire_history (bool): Flag indicating whether to return the entire conversation history. return_entire_history (bool): Flag indicating whether to return the entire conversation history.
interactive (bool): Flag indicating whether to enable interactive mode.
cache_size (int): The size of the cache. cache_size (int): The size of the cache.
max_retries (int): The maximum number of retry attempts. max_retries (int): The maximum number of retry attempts.
retry_delay (float): The delay between retry attempts in seconds. retry_delay (float): The delay between retry attempts in seconds.
show_progress (bool): Flag indicating whether to show progress.
_cache (dict): The cache for storing agent outputs. _cache (dict): The cache for storing agent outputs.
_progress_bar (tqdm): The progress bar for tracking execution.
""" """
def __init__( def __init__(
@ -80,11 +71,9 @@ class ConcurrentWorkflow(BaseSwarm):
return_str_on: bool = False, return_str_on: bool = False,
auto_generate_prompts: bool = False, auto_generate_prompts: bool = False,
return_entire_history: bool = False, return_entire_history: bool = False,
interactive: bool = False,
cache_size: int = 100, cache_size: int = 100,
max_retries: int = 3, max_retries: int = 3,
retry_delay: float = 1.0, retry_delay: float = 1.0,
show_progress: bool = False,
*args, *args,
**kwargs, **kwargs,
): ):
@ -107,21 +96,14 @@ class ConcurrentWorkflow(BaseSwarm):
self.output_type = output_type self.output_type = output_type
self.return_entire_history = return_entire_history self.return_entire_history = return_entire_history
self.tasks = [] # Initialize tasks list self.tasks = [] # Initialize tasks list
self.interactive = interactive
self.cache_size = cache_size self.cache_size = cache_size
self.max_retries = max_retries self.max_retries = max_retries
self.retry_delay = retry_delay self.retry_delay = retry_delay
self.show_progress = show_progress
self._cache = {} self._cache = {}
self._progress_bar = None
self.reliability_check() self.reliability_check()
self.conversation = Conversation() self.conversation = Conversation()
def disable_agent_prints(self):
for agent in self.agents:
agent.no_print = False
def reliability_check(self): def reliability_check(self):
try: try:
formatter.print_panel( formatter.print_panel(
@ -186,44 +168,6 @@ class ConcurrentWorkflow(BaseSwarm):
"""Cached version of agent execution to avoid redundant computations""" """Cached version of agent execution to avoid redundant computations"""
return self.agents[agent_id].run(task=task) return self.agents[agent_id].run(task=task)
def enable_progress_bar(self):
"""Enable progress bar display"""
self.show_progress = True
def disable_progress_bar(self):
"""Disable progress bar display"""
if self._progress_bar:
self._progress_bar.close()
self._progress_bar = None
self.show_progress = False
def _create_progress_bar(self, total: int):
"""Create a progress bar for tracking execution"""
if self.show_progress:
try:
self._progress_bar = tqdm(
total=total,
desc="Processing tasks",
unit="task",
disable=not self.show_progress,
ncols=100,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]",
)
except Exception as e:
logger.warning(f"Failed to create progress bar: {e}")
self.show_progress = False
self._progress_bar = None
return self._progress_bar
def _update_progress(self, increment: int = 1):
"""Update the progress bar"""
if self._progress_bar and self.show_progress:
try:
self._progress_bar.update(increment)
except Exception as e:
logger.warning(f"Failed to update progress bar: {e}")
self.disable_progress_bar()
def _validate_input(self, task: str) -> bool: def _validate_input(self, task: str) -> bool:
"""Validate input task""" """Validate input task"""
if not isinstance(task, str): if not isinstance(task, str):
@ -232,38 +176,6 @@ class ConcurrentWorkflow(BaseSwarm):
raise ValueError("Task cannot be empty") raise ValueError("Task cannot be empty")
return True return True
def _handle_interactive(self, task: str) -> str:
"""Handle interactive mode for task input"""
if self.interactive:
from swarms.utils.formatter import formatter
# Display current task in a panel
formatter.print_panel(
content=f"Current task: {task}",
title="Task Status",
style="bold blue",
)
# Get user input with formatted prompt
formatter.print_panel(
content="Do you want to modify this task? (y/n/q to quit): ",
title="User Input",
style="bold green",
)
response = input().lower()
if response == "q":
return None
elif response == "y":
formatter.print_panel(
content="Enter new task: ",
title="New Task Input",
style="bold yellow",
)
new_task = input()
return new_task
return task
def _run_with_retry( def _run_with_retry(
self, agent: Agent, task: str, img: str = None self, agent: Agent, task: str, img: str = None
) -> Any: ) -> Any:
@ -286,68 +198,69 @@ class ConcurrentWorkflow(BaseSwarm):
self.retry_delay * (attempt + 1) self.retry_delay * (attempt + 1)
) # Exponential backoff ) # Exponential backoff
def _process_agent(
self, agent: Agent, task: str, img: str = None
) -> Any:
"""
Process a single agent with caching and error handling.
Args:
agent: The agent to process
task: Task to execute
img: Optional image input
Returns:
The agent's output
"""
try:
# Fast path - check cache first
cache_key = f"{task}_{agent.agent_name}"
if cache_key in self._cache:
output = self._cache[cache_key]
else:
# Slow path - run agent and update cache
output = self._run_with_retry(agent, task, img)
if len(self._cache) >= self.cache_size:
self._cache.pop(next(iter(self._cache)))
self._cache[cache_key] = output
return output
except Exception as e:
logger.error(
f"Error running agent {agent.agent_name}: {e}"
)
raise
def _run( def _run(
self, task: str, img: str = None, *args, **kwargs self, task: str, img: str = None, *args, **kwargs
) -> Union[Dict[str, Any], str]: ) -> Union[Dict[str, Any], str]:
""" """
Enhanced run method with caching, progress tracking, and better error handling Enhanced run method with parallel execution.
""" """
# Fast validation
# Validate and potentially modify task
self._validate_input(task) self._validate_input(task)
task = self._handle_interactive(task)
# Add task to conversation
self.conversation.add("User", task) self.conversation.add("User", task)
# Create progress bar if enabled
if self.show_progress:
self._create_progress_bar(len(self.agents))
def run_agent(
agent: Agent, task: str, img: str = None
) -> Any:
try:
# Check cache first
cache_key = f"{task}_{agent.agent_name}"
if cache_key in self._cache:
output = self._cache[cache_key]
else:
output = self._run_with_retry(agent, task, img)
# Update cache
if len(self._cache) >= self.cache_size:
self._cache.pop(next(iter(self._cache)))
self._cache[cache_key] = output
self._update_progress()
return output
except Exception as e:
logger.error(
f"Error running agent {agent.agent_name}: {e}"
)
self._update_progress()
raise
try: try:
# Parallel execution with optimized thread pool
with ThreadPoolExecutor( with ThreadPoolExecutor(
max_workers=self.max_workers max_workers=self.max_workers
) as executor: ) as executor:
list( futures = [
executor.map( executor.submit(
lambda agent: run_agent(agent, task), self._process_agent, agent, task, img
self.agents,
) )
) for agent in self.agents
finally: ]
if self._progress_bar and self.show_progress: # Wait for all futures to complete
try: for future in futures:
self._progress_bar.close() future.result()
except Exception as e:
logger.warning( except Exception as e:
f"Failed to close progress bar: {e}" logger.error(f"An error occurred during execution: {e}")
) raise e
finally:
self._progress_bar = None
return history_output_formatter( return history_output_formatter(
self.conversation, self.conversation,
@ -362,20 +275,11 @@ class ConcurrentWorkflow(BaseSwarm):
**kwargs, **kwargs,
) -> Any: ) -> Any:
""" """
Executes the agent's run method on a specified device with optional interactive mode. Executes the agent's run method with parallel execution.
This method attempts to execute the agent's run method on a specified device, either CPU or GPU.
It supports both standard execution and interactive mode where users can modify tasks and continue
the workflow interactively.
Args: Args:
task (Optional[str], optional): The task to be executed. Defaults to None. task (Optional[str], optional): The task to be executed. Defaults to None.
img (Optional[str], optional): The image to be processed. Defaults to None. img (Optional[str], optional): The image to be processed. Defaults to None.
is_last (bool, optional): Indicates if this is the last task. Defaults to False.
device (str, optional): The device to use for execution. Defaults to "cpu".
device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0.
all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True.
all_gpus (bool, optional): If True, uses all available GPUS. Defaults to True.
*args: Additional positional arguments to be passed to the execution method. *args: Additional positional arguments to be passed to the execution method.
**kwargs: Additional keyword arguments to be passed to the execution method. **kwargs: Additional keyword arguments to be passed to the execution method.
@ -383,117 +287,27 @@ class ConcurrentWorkflow(BaseSwarm):
Any: The result of the execution. Any: The result of the execution.
Raises: Raises:
ValueError: If an invalid device is specified. ValueError: If task validation fails.
Exception: If any other error occurs during execution. Exception: If any other error occurs during execution.
""" """
if task is not None: if task is not None:
self.tasks.append(task) self.tasks.append(task)
try: try:
# Handle interactive mode outputs = self._run(task, img, *args, **kwargs)
if self.interactive: return outputs
current_task = task
loop_count = 0
while loop_count < self.max_loops:
if (
self.max_loops is not None
and loop_count >= self.max_loops
):
formatter.print_panel(
content=f"Maximum number of loops ({self.max_loops}) reached.",
title="Session Complete",
style="bold red",
)
break
if current_task is None:
formatter.print_panel(
content="Enter your task (or 'q' to quit): ",
title="Task Input",
style="bold blue",
)
current_task = input()
if current_task.lower() == "q":
break
# Run the workflow with the current task
try:
outputs = self._run(
current_task, img, *args, **kwargs
)
formatter.print_panel(
content=str(outputs),
title="Workflow Result",
style="bold green",
)
except Exception as e:
formatter.print_panel(
content=f"Error: {str(e)}",
title="Error",
style="bold red",
)
# Ask if user wants to continue
formatter.print_panel(
content="Do you want to continue with a new task? (y/n): ",
title="Continue Session",
style="bold yellow",
)
if input().lower() != "y":
break
current_task = None
loop_count += 1
formatter.print_panel(
content="Interactive session ended.",
title="Session Complete",
style="bold blue",
)
return outputs
else:
# Standard non-interactive execution
outputs = self._run(task, img, *args, **kwargs)
return outputs
except ValueError as e:
logger.error(f"Invalid device specified: {e}")
raise e
except Exception as e: except Exception as e:
logger.error(f"An error occurred during execution: {e}") logger.error(f"An error occurred during execution: {e}")
raise e raise e
def run_batched(self, tasks: List[str]) -> Any: def run_batched(self, tasks: List[str]) -> Any:
""" """
Enhanced batched execution with progress tracking Enhanced batched execution
""" """
if not tasks: if not tasks:
raise ValueError("Tasks list cannot be empty") raise ValueError("Tasks list cannot be empty")
results = [] return [self.run(task) for task in tasks]
# Create progress bar if enabled
if self.show_progress:
self._create_progress_bar(len(tasks))
try:
for task in tasks:
result = self.run(task)
results.append(result)
self._update_progress()
finally:
if self._progress_bar and self.show_progress:
try:
self._progress_bar.close()
except Exception as e:
logger.warning(
f"Failed to close progress bar: {e}"
)
finally:
self._progress_bar = None
return results
def clear_cache(self): def clear_cache(self):
"""Clear the task cache""" """Clear the task cache"""

Loading…
Cancel
Save