parent
d73f1a68c4
commit
4b19e40c7b
@ -0,0 +1,126 @@
|
||||
import os
|
||||
import json
|
||||
from pydantic import BaseModel, Field
|
||||
from swarm_models import OpenAIFunctionCaller
|
||||
from dotenv import load_dotenv
|
||||
from typing import Any, List
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
class Flow(BaseModel):
|
||||
id: str = Field(
|
||||
description="A unique identifier for the flow. This should be a short, descriptive name that captures the main purpose of the flow. Use - to separate words and make it lowercase."
|
||||
)
|
||||
plan: str = Field(
|
||||
description="The comprehensive plan detailing how the flow will accomplish the given task. This should include the high-level strategy, key milestones, and expected outcomes. The plan should clearly articulate what the overall goal is, what success looks like, and how progress will be measured throughout execution."
|
||||
)
|
||||
failures_prediction: str = Field(
|
||||
description="A thorough analysis of potential failure modes and mitigation strategies. This should identify technical risks, edge cases, error conditions, and possible points of failure in the flow. For each identified risk, include specific preventive measures, fallback approaches, and recovery procedures to ensure robustness and reliability."
|
||||
)
|
||||
rationale: str = Field(
|
||||
description="The detailed reasoning and justification for why this specific flow design is optimal for the given task. This should explain the key architectural decisions, tradeoffs considered, alternatives evaluated, and why this approach best satisfies the requirements. Include both technical and business factors that influenced the design."
|
||||
)
|
||||
flow: str = Field(
|
||||
description="The precise execution flow defining how agents interact and coordinate. Use -> to indicate sequential processing where one agent must complete before the next begins (e.g. agent1 -> agent2 -> agent3). Use , to indicate parallel execution where multiple agents can run simultaneously (e.g. agent1 -> agent2, agent3, agent4). The flow should clearly show the dependencies and parallelization opportunities between agents. You must only use the agent names provided in the task description do not make up new agent names and do not use any other formatting."
|
||||
)
|
||||
|
||||
|
||||
class AgentRearrangeBuilder(BaseModel):
|
||||
name: str = Field(
|
||||
description="The name of the swarm. This should be a short, descriptive name that captures the main purpose of the flow."
|
||||
)
|
||||
description: str = Field(
|
||||
description="A brief description of the swarm. This should be a concise summary of the main purpose of the swarm."
|
||||
)
|
||||
flows: List[Flow] = Field(
|
||||
description="A list of flows that are optimal for the given task. Each flow should be a detailed plan, failure prediction, rationale, and execution flow."
|
||||
)
|
||||
swarm_flow: str = Field(
|
||||
description="The flow defining how each team should communicate and coordinate with eachother.Use -> to indicate sequential processing where one id must complete before the next begins (e.g. team1 -> team2 -> team3). Use , to indicate parallel execution where multiple teams can run simultaneously (e.g. team1 -> team2, team3, team4). The flow should clearly show the dependencies and parallelization opportunities between teams. You must only use the team names provided in the id do not make up new team names and do not use any other formatting."
|
||||
)
|
||||
|
||||
|
||||
# def flow_generator(task: str) -> Flow:
|
||||
|
||||
|
||||
def setup_model(base_model: BaseModel = Flow):
|
||||
model = OpenAIFunctionCaller(
|
||||
system_prompt="""You are an expert flow architect specializing in designing multi-agent workflows. Your role is to analyze tasks and create optimal execution flows that coordinate multiple AI agents effectively.
|
||||
|
||||
When given a task, you will:
|
||||
1. Develop a comprehensive plan breaking down the task into logical steps
|
||||
2. Carefully consider potential failure modes and build in robust error handling
|
||||
3. Provide clear rationale for your architectural decisions and agent coordination strategy
|
||||
4. Design a precise flow showing both sequential dependencies and parallel execution opportunities
|
||||
|
||||
Your flows should maximize:
|
||||
- Efficiency through smart parallelization
|
||||
- Reliability through thorough error handling
|
||||
- Clarity through well-structured agent interactions
|
||||
- Effectiveness through strategic task decomposition
|
||||
|
||||
Format your flow using -> for sequential steps and , for parallel execution. Be specific about agent roles and interactions.
|
||||
""",
|
||||
base_model=base_model,
|
||||
openai_api_key=os.getenv("OPENAI_API_KEY"),
|
||||
temperature=0.5,
|
||||
)
|
||||
return model
|
||||
|
||||
|
||||
def generate_flow(task: str) -> Any:
|
||||
model = setup_model()
|
||||
flow = model.run(task)
|
||||
print(json.dumps(flow, indent=4))
|
||||
return flow
|
||||
|
||||
|
||||
def generate_agent_rearrange(task: str) -> Any:
|
||||
model = setup_model(base_model=AgentRearrangeBuilder)
|
||||
flow = model.run(task)
|
||||
print(json.dumps(flow, indent=4))
|
||||
return flow
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Basic patient diagnosis flow
|
||||
# generate_flow("Diagnose a patient's symptoms and create a treatment plan. You have 3 agents to use: Diagnostician, Specialist, CareCoordinator")
|
||||
|
||||
# # Complex multi-condition case
|
||||
# generate_flow("""Handle a complex patient case with multiple chronic conditions requiring ongoing care coordination.
|
||||
# The patient has diabetes, heart disease, and chronic pain.
|
||||
# Create a comprehensive diagnosis and treatment plan.
|
||||
# You have 3 agents to use: Diagnostician, Specialist, CareCoordinator""")
|
||||
|
||||
# # Emergency trauma case
|
||||
# generate_flow("""Process an emergency trauma case requiring rapid diagnosis and immediate intervention.
|
||||
# Patient presents with multiple injuries from a car accident.
|
||||
# Develop immediate and long-term treatment plans.
|
||||
# You have 3 agents to use: Diagnostician, Specialist, CareCoordinator""")
|
||||
|
||||
# # Long-term care planning
|
||||
# generate_flow("""Design a 6-month care plan for an elderly patient with declining cognitive function.
|
||||
# Include regular assessments, specialist consultations, and family coordination.
|
||||
# You have 3 agents to use: Diagnostician, Specialist, CareCoordinator""")
|
||||
|
||||
# # Mental health assessment
|
||||
# generate_flow("""Conduct a comprehensive mental health assessment and develop treatment strategy.
|
||||
# Patient shows signs of depression and anxiety with possible underlying conditions.
|
||||
# Create both immediate intervention and long-term support plans.
|
||||
# You have 3 agents to use: Diagnostician, Specialist, CareCoordinator""")
|
||||
|
||||
generate_agent_rearrange(
|
||||
"""Build a complete automated hedge fund system.
|
||||
Design and implement a sophisticated trading strategy incorporating multiple asset classes,
|
||||
risk management protocols, and automated execution systems.
|
||||
The system should include:
|
||||
- Market analysis and research capabilities
|
||||
- Portfolio optimization and risk management
|
||||
- Automated trade execution and settlement
|
||||
- Compliance and regulatory monitoring
|
||||
- Performance tracking and reporting
|
||||
- Fund operations and administration
|
||||
Create a comprehensive architecture that integrates all these components into a fully automated system."""
|
||||
)
|
||||
|
@ -1,232 +0,0 @@
|
||||
from pydantic import BaseModel, Field
|
||||
import yaml
|
||||
import json
|
||||
from swarms.utils.loguru_logger import logger
|
||||
from typing import Any, Dict
|
||||
from typing import Type
|
||||
from dataclasses import is_dataclass, fields
|
||||
|
||||
|
||||
def get_type_name(typ: Type) -> str:
|
||||
"""Map Python types to simple string representations."""
|
||||
if hasattr(typ, "__name__"):
|
||||
return typ.__name__
|
||||
return str(typ)
|
||||
|
||||
|
||||
def create_yaml_schema_from_dict(
|
||||
data: Dict[str, Any], model_class: Type
|
||||
) -> str:
|
||||
"""
|
||||
Generate a YAML schema based on a dictionary and a class (can be a Pydantic model, regular class, or dataclass).
|
||||
|
||||
Args:
|
||||
data: The dictionary with key-value pairs where keys are attribute names and values are example data.
|
||||
model_class: The class which the data should conform to, used for obtaining type information.
|
||||
|
||||
Returns:
|
||||
A string containing the YAML schema.
|
||||
|
||||
Example usage:
|
||||
>>> data = {'name': 'Alice', 'age: 30, 'is_active': True}
|
||||
>>> print(create_yaml_schema_from_dict(data, User))
|
||||
|
||||
"""
|
||||
schema = {}
|
||||
if is_dataclass(model_class):
|
||||
for field in fields(model_class):
|
||||
schema[field.name] = {
|
||||
"type": get_type_name(field.type),
|
||||
"default": (
|
||||
field.default
|
||||
if field.default != field.default_factory
|
||||
else None
|
||||
),
|
||||
"description": field.metadata.get(
|
||||
"description", "No description provided"
|
||||
),
|
||||
}
|
||||
elif isinstance(model_class, BaseModel):
|
||||
for field_name, model_field in model_class.__fields__.items():
|
||||
field_info = model_field.field_info
|
||||
schema[field_name] = {
|
||||
"type": get_type_name(model_field.outer_type_),
|
||||
"default": field_info.default,
|
||||
"description": (
|
||||
field_info.description
|
||||
or "No description provided."
|
||||
),
|
||||
}
|
||||
else:
|
||||
# Fallback for regular classes (non-dataclass, non-Pydantic)
|
||||
for attr_name, attr_value in data.items():
|
||||
attr_type = type(attr_value)
|
||||
schema[attr_name] = {
|
||||
"type": get_type_name(attr_type),
|
||||
"description": "No description provided",
|
||||
}
|
||||
|
||||
return yaml.safe_dump(schema, sort_keys=False)
|
||||
|
||||
|
||||
def pydantic_type_to_yaml_schema(pydantic_type):
|
||||
"""
|
||||
Map Pydantic types to YAML schema types.
|
||||
|
||||
Args:
|
||||
pydantic_type (type): The Pydantic type to be mapped.
|
||||
|
||||
Returns:
|
||||
str: The corresponding YAML schema type.
|
||||
|
||||
"""
|
||||
type_mapping = {
|
||||
int: "integer",
|
||||
float: "number",
|
||||
str: "string",
|
||||
bool: "boolean",
|
||||
list: "array",
|
||||
dict: "object",
|
||||
}
|
||||
# For more complex types or generics, you would expand this mapping
|
||||
base_type = getattr(pydantic_type, "__origin__", pydantic_type)
|
||||
if base_type is None:
|
||||
base_type = pydantic_type
|
||||
return type_mapping.get(base_type, "string")
|
||||
|
||||
|
||||
class YamlModel(BaseModel):
|
||||
"""
|
||||
A Pydantic model class for working with YAML data.
|
||||
|
||||
|
||||
Example usage:
|
||||
# Example usage with an extended YamlModel
|
||||
>>> class User(YamlModel):
|
||||
name: str
|
||||
age: int
|
||||
is_active: bool
|
||||
|
||||
# Create an instance of the User model
|
||||
>>> user = User(name="Alice", age=30, is_active=True)
|
||||
|
||||
# Serialize the User instance to YAML and print it
|
||||
>>> print(user.to_yaml())
|
||||
|
||||
# Convert JSON to YAML and print
|
||||
>>> json_string = '{"name": "Bob", "age": 25, "is_active": false}'
|
||||
>>> print(YamlModel.json_to_yaml(json_string))
|
||||
|
||||
# Save the User instance to a YAML file
|
||||
>>> user.save_to_yaml('user.yaml')
|
||||
"""
|
||||
|
||||
input_dict: Dict[str, Any] = Field(
|
||||
None,
|
||||
title="Data",
|
||||
description="The data to be serialized to YAML.",
|
||||
)
|
||||
|
||||
def to_yaml(self):
|
||||
"""
|
||||
Serialize the Pydantic model instance to a YAML string.
|
||||
"""
|
||||
return yaml.safe_dump(self.input_dict, sort_keys=False)
|
||||
|
||||
def from_yaml(self, cls, yaml_str: str):
|
||||
"""
|
||||
Create an instance of the class from a YAML string.
|
||||
|
||||
Args:
|
||||
yaml_str (str): The YAML string to parse.
|
||||
|
||||
Returns:
|
||||
cls: An instance of the class with attributes populated from the YAML data.
|
||||
Returns None if there was an error loading the YAML data.
|
||||
"""
|
||||
try:
|
||||
data = yaml.safe_load(yaml_str)
|
||||
return cls(**data)
|
||||
except ValueError as error:
|
||||
logger.error(f"Error loading YAML data: {error}")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def json_to_yaml(self, json_str: str):
|
||||
"""
|
||||
Convert a JSON string to a YAML string.
|
||||
"""
|
||||
data = json.loads(
|
||||
json_str
|
||||
) # Convert JSON string to dictionary
|
||||
return yaml.dump(data)
|
||||
|
||||
def save_to_yaml(self, filename: str):
|
||||
"""
|
||||
Save the Pydantic model instance as a YAML file.
|
||||
"""
|
||||
yaml_data = self.to_yaml()
|
||||
with open(filename, "w") as file:
|
||||
file.write(yaml_data)
|
||||
|
||||
# TODO: Implement a method to create a YAML schema from the model fields
|
||||
# @classmethod
|
||||
# def create_yaml_schema(cls):
|
||||
# """
|
||||
# Generate a YAML schema based on the fields of the given BaseModel Class.
|
||||
|
||||
# Args:
|
||||
# cls: The class for which the YAML schema is generated.
|
||||
|
||||
# Returns:
|
||||
# A YAML representation of the schema.
|
||||
|
||||
# """
|
||||
# schema = {}
|
||||
# for field_name, model_field in cls.model_fields.items(): # Use model_fields
|
||||
# field_type = model_field.type_ # Assuming type_ for field type access
|
||||
# field_info = model_field # FieldInfo object already
|
||||
# schema[field_name] = {
|
||||
# 'type': pydantic_type_to_yaml_schema(field_type),
|
||||
# 'description': field_info.description or "No description provided."
|
||||
# }
|
||||
# if field_info is not None: # Check for default value directly
|
||||
# schema[field_name]['default'] = field_info.default
|
||||
# return yaml.safe_dump(schema, sort_keys=False)
|
||||
|
||||
def create_yaml_schema_from_dict(
|
||||
self, data: Dict[str, Any], model_class: Type
|
||||
) -> str:
|
||||
"""
|
||||
Generate a YAML schema based on a dictionary and a class (can be a Pydantic model, regular class, or dataclass).
|
||||
|
||||
Args:
|
||||
data: The dictionary with key-value pairs where keys are attribute names and values are example data.
|
||||
model_class: The class which the data should conform to, used for obtaining type information.
|
||||
|
||||
Returns:
|
||||
A string containing the YAML schema.
|
||||
|
||||
Example usage:
|
||||
>>> data = {'name': 'Alice', 'age: 30, 'is_active': True}
|
||||
"""
|
||||
return create_yaml_schema_from_dict(data, model_class)
|
||||
|
||||
def yaml_to_dict(self, yaml_str: str):
|
||||
"""
|
||||
Convert a YAML string to a Python dictionary.
|
||||
"""
|
||||
return yaml.safe_load(yaml_str)
|
||||
|
||||
def dict_to_yaml(self, data: Dict[str, Any]):
|
||||
"""
|
||||
Convert a Python dictionary to a YAML string.
|
||||
"""
|
||||
return yaml.safe_dump(data, sort_keys=False)
|
||||
|
||||
|
||||
# dict = {'name': 'Alice', 'age': 30, 'is_active': True}
|
||||
|
||||
# # Comvert the dictionary to a YAML schema dict to yaml
|
||||
# yaml_model = YamlModel().dict_to_yaml(dict)
|
||||
# print(yaml_model)
|
@ -0,0 +1,156 @@
|
||||
import os
|
||||
from pydantic import BaseModel, Field
|
||||
from swarm_models import OpenAIFunctionCaller
|
||||
from dotenv import load_dotenv
|
||||
from typing import Any
|
||||
from swarms.utils.loguru_logger import logger
|
||||
from swarms.tools.prebuilt.code_executor import CodeExecutor
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
class Tool(BaseModel):
|
||||
id: str = Field(
|
||||
description="A unique identifier for the task. This should be a short, descriptive name that captures the main purpose of the task. Use - to separate words and make it lowercase."
|
||||
)
|
||||
plan: str = Field(
|
||||
description="The comprehensive plan detailing how the task will accomplish the given task. This should include the high-level strategy, key milestones, and expected outcomes. The plan should clearly articulate what the overall goal is, what success looks like, and how progress will be measured throughout execution."
|
||||
)
|
||||
failures_prediction: str = Field(
|
||||
description="A thorough analysis of potential failure modes and mitigation strategies. This should identify technical risks, edge cases, error conditions, and possible points of failure in the task. For each identified risk, include specific preventive measures, fallback approaches, and recovery procedures to ensure robustness and reliability."
|
||||
)
|
||||
rationale: str = Field(
|
||||
description="The detailed reasoning and justification for why this specific task design is optimal for the given task. This should explain the key architectural decisions, tradeoffs considered, alternatives evaluated, and why this approach best satisfies the requirements. Include both technical and business factors that influenced the design."
|
||||
)
|
||||
code: str = Field(
|
||||
description="Generate the code for the task. This should be a python function that takes in a task and returns a result. The code should be a complete and working implementation of the task. Include all necessary imports and dependencies and add types, docstrings, and comments to the code. Make sure the main code executes successfully. No placeholders or comments. Make sure the main function executes successfully."
|
||||
)
|
||||
|
||||
|
||||
def setup_model(base_model: BaseModel = Tool):
|
||||
model = OpenAIFunctionCaller(
|
||||
system_prompt="""You are an expert Python developer specializing in building reliable API integrations and developer tools. Your role is to generate production-ready code that follows best practices for API interactions and tool development.
|
||||
|
||||
When given a task, you will:
|
||||
1. Design robust error handling and retry mechanisms for API calls
|
||||
2. Implement proper authentication and security measures
|
||||
3. Structure code for maintainability and reusability
|
||||
4. Add comprehensive logging and monitoring
|
||||
5. Include detailed type hints and documentation
|
||||
6. Write unit tests to verify functionality
|
||||
|
||||
Your code should follow these principles:
|
||||
- Use modern Python features and idioms
|
||||
- Handle rate limits and API quotas gracefully
|
||||
- Validate inputs and outputs thoroughly
|
||||
- Follow security best practices for API keys and secrets
|
||||
- Include clear error messages and debugging info
|
||||
- Be well-documented with docstrings and comments
|
||||
- Use appropriate design patterns
|
||||
- Follow PEP 8 style guidelines
|
||||
|
||||
The generated code should be complete, tested, and ready for production use. Include all necessary imports, error handling, and helper functions.
|
||||
""",
|
||||
base_model=base_model,
|
||||
openai_api_key=os.getenv("OPENAI_API_KEY"),
|
||||
temperature=0.5,
|
||||
)
|
||||
return model
|
||||
|
||||
|
||||
def generate_tool(task: str) -> Any:
|
||||
model = setup_model()
|
||||
response = model.run(task)
|
||||
logger.info(f"Response: {response}")
|
||||
|
||||
# If response is a dict, get code directly
|
||||
if isinstance(response, dict):
|
||||
# return response.get("code", "")
|
||||
code = response.get("code", "")
|
||||
logger.info(f"Code: {code}")
|
||||
return code
|
||||
# If response is a Tool object, access code attribute
|
||||
elif isinstance(response, Tool):
|
||||
code = response.code
|
||||
logger.info(f"Code: {code}")
|
||||
return code
|
||||
# If response is a string (raw code)
|
||||
elif isinstance(response, str):
|
||||
code = response
|
||||
logger.info(f"Code: {code}")
|
||||
return code
|
||||
logger.error(f"Unexpected response type: {type(response)}")
|
||||
return ""
|
||||
|
||||
|
||||
def execute_generated_code(code: str) -> Any:
|
||||
"""
|
||||
Attempts to execute the generated Python code, handling errors and retrying if necessary.
|
||||
|
||||
Args:
|
||||
code (str): The Python code to be executed.
|
||||
|
||||
Returns:
|
||||
Any: Output of the code execution, or error details if execution fails.
|
||||
"""
|
||||
logger.info("Starting code execution")
|
||||
try:
|
||||
exec_namespace = {}
|
||||
exec(code, exec_namespace)
|
||||
|
||||
# Check for any callable functions in the namespace
|
||||
main_function = None
|
||||
for item in exec_namespace.values():
|
||||
if callable(item) and not item.__name__.startswith('__'):
|
||||
main_function = item
|
||||
break
|
||||
|
||||
if main_function:
|
||||
result = main_function()
|
||||
logger.info(f"Code execution successful. Function result: {result}")
|
||||
return result
|
||||
elif "result" in exec_namespace:
|
||||
logger.info(f"Code execution successful. Result variable: {exec_namespace['result']}")
|
||||
return exec_namespace['result']
|
||||
else:
|
||||
logger.warning("Code execution completed but no result found")
|
||||
return "No result or function found in executed code."
|
||||
except Exception as e:
|
||||
logger.error(f"Code execution failed with error: {str(e)}", exc_info=True)
|
||||
return e
|
||||
|
||||
|
||||
def retry_until_success(task: str, max_retries: int = 5):
|
||||
"""
|
||||
Generates and executes code until the execution is successful.
|
||||
|
||||
Args:
|
||||
task (str): Task description to generate the required code.
|
||||
"""
|
||||
attempts = 0
|
||||
|
||||
while attempts < max_retries:
|
||||
logger.info(f"Attempt {attempts + 1} of {max_retries}")
|
||||
tool = generate_tool(task)
|
||||
logger.debug(f"Generated code:\n{tool}")
|
||||
|
||||
# result = execute_generated_code(tool)
|
||||
result = CodeExecutor().execute(code=tool)
|
||||
logger.info(f"Result: {result}")
|
||||
|
||||
if isinstance(result, Exception):
|
||||
logger.error(f"Attempt {attempts + 1} failed: {str(result)}")
|
||||
print("Retrying with updated code...")
|
||||
attempts += 1
|
||||
else:
|
||||
logger.info(f"Success on attempt {attempts + 1}. Result: {result}")
|
||||
print(f"Code executed successfully: {result}")
|
||||
break
|
||||
else:
|
||||
logger.error("Max retries reached. Execution failed.")
|
||||
print("Max retries reached. Execution failed.")
|
||||
|
||||
# Usage
|
||||
retry_until_success(
|
||||
"Write a function to fetch and display weather information from a given API."
|
||||
)
|
Loading…
Reference in new issue