pull/634/head
Your Name 2 months ago
parent a1ff1e1392
commit 2b4be2bef7

@ -1,68 +0,0 @@
import os
from swarms import Agent
from swarm_models import OpenAIChat
from swarms.structs.agents_available import showcase_available_agents
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize the Claims Director agent
director_agent = Agent(
agent_name="ClaimsDirector",
agent_description="Oversees and coordinates the medical insurance claims processing workflow",
system_prompt="""You are the Claims Director responsible for managing the medical insurance claims process.
Assign and prioritize tasks between claims processors and auditors. Ensure claims are handled efficiently
and accurately while maintaining compliance with insurance policies and regulations.""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="director_agent.json",
)
# Initialize Claims Processor agent
processor_agent = Agent(
agent_name="ClaimsProcessor",
agent_description="Reviews and processes medical insurance claims, verifying coverage and eligibility",
system_prompt="""Review medical insurance claims for completeness and accuracy. Verify patient eligibility,
coverage details, and process claims according to policy guidelines. Flag any claims requiring special review.""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="processor_agent.json",
)
# Initialize Claims Auditor agent
auditor_agent = Agent(
agent_name="ClaimsAuditor",
agent_description="Audits processed claims for accuracy and compliance with policies and regulations",
system_prompt="""Audit processed insurance claims for accuracy and compliance. Review claim decisions,
identify potential fraud or errors, and ensure all processing follows established guidelines and regulations.""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="auditor_agent.json",
)
# Create a list of agents
agents = [director_agent, processor_agent, auditor_agent]
print(showcase_available_agents(agents=agents))

@ -1,99 +0,0 @@
import os
from swarm_models import OpenAIChat
from swarms import Agent, run_agents_with_tasks_concurrently
# Fetch the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize agents for different roles
delaware_ccorp_agent = Agent(
agent_name="Delaware-CCorp-Hiring-Agent",
system_prompt="""
Create a comprehensive hiring description for a Delaware C Corporation,
including all relevant laws and regulations, such as the Delaware General
Corporation Law (DGCL) and the Delaware Corporate Law. Ensure the description
covers the requirements for hiring employees, contractors, and officers,
including the necessary paperwork, tax obligations, and benefits. Also,
outline the procedures for compliance with Delaware's employment laws,
including anti-discrimination laws, workers' compensation, and unemployment
insurance. Provide guidance on how to navigate the complexities of Delaware's
corporate law and ensure that all hiring practices are in compliance with
state and federal regulations.
""",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
output_type="str",
artifacts_on=True,
artifacts_output_path="delaware_ccorp_hiring_description.md",
artifacts_file_extension=".md",
)
indian_foreign_agent = Agent(
agent_name="Indian-Foreign-Hiring-Agent",
system_prompt="""
Create a comprehensive hiring description for an Indian or foreign country,
including all relevant laws and regulations, such as the Indian Contract Act,
the Indian Labour Laws, and the Foreign Exchange Management Act (FEMA).
Ensure the description covers the requirements for hiring employees,
contractors, and officers, including the necessary paperwork, tax obligations,
and benefits. Also, outline the procedures for compliance with Indian and
foreign employment laws, including anti-discrimination laws, workers'
compensation, and unemployment insurance. Provide guidance on how to navigate
the complexities of Indian and foreign corporate law and ensure that all hiring
practices are in compliance with state and federal regulations. Consider the
implications of hiring foreign nationals and the requirements for obtaining
necessary visas and work permits.
""",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
output_type="str",
artifacts_on=True,
artifacts_output_path="indian_foreign_hiring_description.md",
artifacts_file_extension=".md",
)
# List of agents and corresponding tasks
agents = [delaware_ccorp_agent, indian_foreign_agent]
tasks = [
"""
Create a comprehensive hiring description for an Agent Engineer, including
required skills and responsibilities. Ensure the description covers the
necessary technical expertise, such as proficiency in AI/ML frameworks,
programming languages, and data structures. Outline the key responsibilities,
including designing and developing AI agents, integrating with existing systems,
and ensuring scalability and performance.
""",
"""
Generate a detailed job description for a Prompt Engineer, including
required skills and responsibilities. Ensure the description covers the
necessary technical expertise, such as proficiency in natural language processing,
machine learning, and software development. Outline the key responsibilities,
including designing and optimizing prompts for AI systems, ensuring prompt
quality and consistency, and collaborating with cross-functional teams.
""",
]
# Run agents with tasks concurrently
results = run_agents_with_tasks_concurrently(
agents,
tasks,
all_cores=True,
device="cpu",
)
# Print the results
for result in results:
print(result)

@ -1,119 +0,0 @@
import os
from swarms import Agent, AgentRearrange
from swarm_models import OpenAIChat
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize the boss agent (Director)
boss_agent = Agent(
agent_name="BossAgent",
system_prompt="""
You are the BossAgent responsible for managing and overseeing a swarm of agents analyzing company expenses.
Your job is to dynamically assign tasks, prioritize their execution, and ensure that all agents collaborate efficiently.
After receiving a report on the company's expenses, you will break down the work into smaller tasks,
assigning specific tasks to each agent, such as detecting recurring high costs, categorizing expenditures,
and identifying unnecessary transactions. Ensure the results are communicated back in a structured way
so the finance team can take actionable steps to cut off unproductive spending. You also monitor and
dynamically adapt the swarm to optimize their performance. Finally, you summarize their findings
into a coherent report.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="boss_agent.json",
)
# Initialize worker 1: Expense Analyzer
worker1 = Agent(
agent_name="ExpenseAnalyzer",
system_prompt="""
Your task is to carefully analyze the company's expense data provided to you.
You will focus on identifying high-cost recurring transactions, categorizing expenditures
(e.g., marketing, operations, utilities, etc.), and flagging areas where there seems to be excessive spending.
You will provide a detailed breakdown of each category, along with specific recommendations for cost-cutting.
Pay close attention to monthly recurring subscriptions, office supplies, and non-essential expenditures.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker1.json",
)
# Initialize worker 2: Summary Generator
worker2 = Agent(
agent_name="SummaryGenerator",
system_prompt="""
After receiving the detailed breakdown from the ExpenseAnalyzer,
your task is to create a concise summary of the findings. You will focus on the most actionable insights,
such as highlighting the specific transactions that can be immediately cut off and summarizing the areas
where the company is overspending. Your summary will be used by the BossAgent to generate the final report.
Be clear and to the point, emphasizing the urgency of cutting unnecessary expenses.
""",
llm=model,
max_loops=1,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
state_save_file_type="json",
saved_state_path="worker2.json",
)
# Swarm-Level Prompt (Collaboration Prompt)
swarm_prompt = """
As a swarm, your collective goal is to analyze the company's expenses and identify transactions that should be cut off.
You will work collaboratively to break down the entire process of expense analysis into manageable steps.
The BossAgent will direct the flow and assign tasks dynamically to the agents. The ExpenseAnalyzer will first
focus on breaking down the expense report, identifying high-cost recurring transactions, categorizing them,
and providing recommendations for potential cost reduction. After the analysis, the SummaryGenerator will then
consolidate all the findings into an actionable summary that the finance team can use to immediately cut off unnecessary expenses.
Together, your collaboration is essential to streamlining and improving the companys financial health.
"""
# Create a list of agents
agents = [boss_agent, worker1, worker2]
# Define the flow pattern for the swarm
flow = "BossAgent -> ExpenseAnalyzer -> SummaryGenerator"
# Using AgentRearrange class to manage the swarm
agent_system = AgentRearrange(
agents=agents,
flow=flow,
return_json=False,
output_type="final",
max_loops=1,
docs=["SECURITY.md"],
)
# Input task for the swarm
task = f"""
{swarm_prompt}
The company has been facing a rising number of unnecessary expenses, and the finance team needs a detailed
analysis of recent transactions to identify which expenses can be cut off to improve profitability.
Analyze the provided transaction data and create a detailed report on cost-cutting opportunities,
focusing on recurring transactions and non-essential expenditures.
"""
# Run the swarm system with the task
output = agent_system.run(task)
print(output)

@ -1,117 +0,0 @@
import os
from dotenv import load_dotenv
from swarms import Agent, SequentialWorkflow
from swarm_models import OpenAIChat
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize specialized agents
data_extractor_agent = Agent(
agent_name="Data-Extractor",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="data_extractor_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
summarizer_agent = Agent(
agent_name="Document-Summarizer",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="summarizer_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
financial_analyst_agent = Agent(
agent_name="Financial-Analyst",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="financial_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
market_analyst_agent = Agent(
agent_name="Market-Analyst",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="market_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
operational_analyst_agent = Agent(
agent_name="Operational-Analyst",
system_prompt=None,
llm=model,
max_loops=1,
autosave=True,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="operational_analyst_agent.json",
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
)
# Initialize the SwarmRouter
router = SequentialWorkflow(
name="pe-document-analysis-swarm",
description="Analyze documents for private equity due diligence and investment decision-making",
max_loops=1,
agents=[
data_extractor_agent,
summarizer_agent,
financial_analyst_agent,
market_analyst_agent,
operational_analyst_agent,
],
output_type="all",
)
# Example usage
if __name__ == "__main__":
# Run a comprehensive private equity document analysis task
result = router.run(
"Where is the best place to find template term sheets for series A startups. Provide links and references"
)
print(result)

@ -1,93 +0,0 @@
from typing import List, Any
from loguru import logger
from swarms.structs.agent import Agent
def get_agent_name(agent: Any) -> str:
"""Helper function to safely get agent name
Args:
agent (Any): The agent object to get name from
Returns:
str: The agent's name if found, 'Unknown' otherwise
"""
if isinstance(agent, Agent) and hasattr(agent, "agent_name"):
return agent.agent_name
return "Unknown"
def get_agent_description(agent: Any) -> str:
"""Helper function to get agent description or system prompt preview
Args:
agent (Any): The agent object
Returns:
str: Description or first 100 chars of system prompt
"""
if not isinstance(agent, Agent):
return "N/A"
if hasattr(agent, "description") and agent.description:
return agent.description
if hasattr(agent, "system_prompt") and agent.system_prompt:
return f"{agent.system_prompt[:150]}..."
return "N/A"
def showcase_available_agents(
name: str = None,
description: str = None,
agents: List[Agent] = [],
update_agents_on: bool = False,
) -> str:
"""
Generate a formatted string showcasing all available agents and their descriptions.
Args:
agents (List[Agent]): List of Agent objects to showcase.
update_agents_on (bool, optional): If True, updates each agent's system prompt with
the showcase information. Defaults to False.
Returns:
str: Formatted string containing agent information, including names, descriptions
and IDs for all available agents.
"""
logger.info(f"Showcasing {len(agents)} available agents")
formatted_agents = []
header = f"\n####### Agents available in the swarm: {name} ############\n"
header += f"{description}\n"
row_format = "{:<5} | {:<20} | {:<50}"
header_row = row_format.format("ID", "Agent Name", "Description")
separator = "-" * 80
formatted_agents.append(header)
formatted_agents.append(separator)
formatted_agents.append(header_row)
formatted_agents.append(separator)
for idx, agent in enumerate(agents):
if not isinstance(agent, Agent):
logger.warning(
f"Skipping non-Agent object: {type(agent)}"
)
continue
agent_name = get_agent_name(agent)
description = (
get_agent_description(agent)[:100] + "..."
if len(get_agent_description(agent)) > 100
else get_agent_description(agent)
)
formatted_agents.append(
row_format.format(idx + 1, agent_name, description)
)
showcase = "\n".join(formatted_agents)
return showcase

@ -1,299 +0,0 @@
from loguru import logger
import os
from typing import List
from pydantic import BaseModel, Field
from swarm_models import OpenAIFunctionCaller, OpenAIChat
from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter
class AgentConfig(BaseModel):
"""Configuration for an individual agent in a swarm"""
name: str = Field(
description="The name of the agent", example="Research-Agent"
)
description: str = Field(
description="A description of the agent's purpose and capabilities",
example="Agent responsible for researching and gathering information",
)
system_prompt: str = Field(
description="The system prompt that defines the agent's behavior",
example="You are a research agent. Your role is to gather and analyze information...",
)
max_loops: int = Field(
description="Maximum number of reasoning loops the agent can perform",
example=3,
)
class SwarmConfig(BaseModel):
"""Configuration for a swarm of cooperative agents"""
name: str = Field(
description="The name of the swarm",
example="Research-Writing-Swarm",
)
description: str = Field(
description="The description of the swarm's purpose and capabilities",
example="A swarm of agents that work together to research topics and write articles",
)
agents: List[AgentConfig] = Field(
description="The list of agents that make up the swarm",
example=[
AgentConfig(
name="Research-Agent",
description="Gathers information",
system_prompt="You are a research agent...",
max_loops=2,
),
AgentConfig(
name="Writing-Agent",
description="Writes content",
system_prompt="You are a writing agent...",
max_loops=1,
),
],
)
max_loops: int = Field(
description="The maximum number of loops to run the swarm",
example=1,
)
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
BOSS_SYSTEM_PROMPT = """
Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective.
### Instructions:
1. **Task Assignment**:
- Analyze available worker agents when a task is presented.
- Delegate tasks to existing agents with clear, direct, and actionable instructions if an appropriate agent is available.
- If no suitable agent exists, create a new agent with a fitting system prompt to handle the task.
2. **Agent Creation**:
- Name agents according to the task they are intended to perform (e.g., "Twitter Marketing Agent").
- Provide each new agent with a concise and clear system prompt that includes its role, objectives, and any tools it can utilize.
3. **Efficiency**:
- Minimize redundancy and maximize task completion speed.
- Avoid unnecessary agent creation if an existing agent can fulfill the task.
4. **Communication**:
- Be explicit in task delegation instructions to avoid ambiguity and ensure effective task execution.
- Require agents to report back on task completion or encountered issues.
5. **Reasoning and Decisions**:
- Offer brief reasoning when selecting or creating agents to maintain transparency.
- Avoid using an agent if unnecessary, with a clear explanation if no agents are suitable for a task.
# Output Format
Present your plan in clear, bullet-point format or short concise paragraphs, outlining task assignment, agent creation, efficiency strategies, and communication protocols.
# Notes
- Preserve transparency by always providing reasoning for task-agent assignments and creation.
- Ensure instructions to agents are unambiguous to minimize error.
"""
class AutoSwarmBuilder:
"""A class that automatically builds and manages swarms of AI agents.
This class handles the creation, coordination and execution of multiple AI agents working
together as a swarm to accomplish complex tasks. It uses a boss agent to delegate work
and create new specialized agents as needed.
Args:
name (str): The name of the swarm
description (str): A description of the swarm's purpose
verbose (bool, optional): Whether to output detailed logs. Defaults to True.
max_loops (int, optional): Maximum number of execution loops. Defaults to 1.
"""
def __init__(
self,
name: str = None,
description: str = None,
verbose: bool = True,
max_loops: int = 1,
):
self.name = name
self.description = description
self.verbose = verbose
self.max_loops = max_loops
self.agents_pool = []
logger.info(
f"Initialized AutoSwarmBuilder: {name} {description}"
)
# @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def run(self, task: str, image_url: str = None, *args, **kwargs):
"""Run the swarm on a given task.
Args:
task (str): The task to be accomplished
image_url (str, optional): URL of an image input if needed. Defaults to None.
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
Returns:
The output from the swarm's execution
"""
logger.info(f"Running swarm on task: {task}")
agents = self._create_agents(task, image_url, *args, **kwargs)
logger.info(f"Agents created {len(agents)}")
logger.info("Routing task through swarm")
output = self.swarm_router(agents, task, image_url)
logger.info(f"Swarm execution complete with output: {output}")
return output
# @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def _create_agents(self, task: str, *args, **kwargs):
"""Create the necessary agents for a task.
Args:
task (str): The task to create agents for
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
Returns:
list: List of created agents
"""
logger.info("Creating agents for task")
model = OpenAIFunctionCaller(
system_prompt=BOSS_SYSTEM_PROMPT,
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.1,
base_model=SwarmConfig,
)
agents_dictionary = model.run(task)
logger.info(f"Agents dictionary: {agents_dictionary}")
# Convert dictionary to SwarmConfig if needed
if isinstance(agents_dictionary, dict):
agents_dictionary = SwarmConfig(**agents_dictionary)
# Set swarm config
self.name = agents_dictionary.name
self.description = agents_dictionary.description
self.max_loops = getattr(
agents_dictionary, "max_loops", 1
) # Default to 1 if not set
logger.info(
f"Swarm config: {self.name}, {self.description}, {self.max_loops}"
)
# Create agents from config
agents = []
for agent_config in agents_dictionary.agents:
# Convert dict to AgentConfig if needed
if isinstance(agent_config, dict):
agent_config = AgentConfig(**agent_config)
agent = self.build_agent(
agent_name=agent_config.name,
agent_description=agent_config.description,
agent_system_prompt=agent_config.system_prompt,
max_loops=agent_config.max_loops,
)
agents.append(agent)
return agents
def build_agent(
self,
agent_name: str,
agent_description: str,
agent_system_prompt: str,
max_loops: int = 1,
):
"""Build a single agent with the given specifications.
Args:
agent_name (str): Name of the agent
agent_description (str): Description of the agent's purpose
agent_system_prompt (str): The system prompt for the agent
Returns:
Agent: The constructed agent instance
"""
logger.info(f"Building agent: {agent_name}")
agent = Agent(
agent_name=agent_name,
description=agent_description,
system_prompt=agent_system_prompt,
llm=model,
max_loops=max_loops,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path=f"{agent_name}.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" soon "yaml" and
streaming_on=False,
auto_generate_prompt=True,
)
return agent
# @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def swarm_router(
self,
agents: List[Agent],
task: str,
image_url: str = None,
*args,
**kwargs,
):
"""Route tasks between agents in the swarm.
Args:
agents (List[Agent]): List of available agents
task (str): The task to route
image_url (str, optional): URL of an image input if needed. Defaults to None.
*args: Variable length argument list
**kwargs: Arbitrary keyword arguments
Returns:
The output from the routed task execution
"""
logger.info("Routing task through swarm")
swarm_router_instance = SwarmRouter(
agents=agents,
swarm_type="auto",
max_loops=1,
)
return swarm_router_instance.run(
self.name + " " + self.description + " " + task,
)
example = AutoSwarmBuilder()
print(
example.run(
"Write multiple blog posts about the latest advancements in swarm intelligence all at once"
)
)

@ -1,34 +0,0 @@
from typing import Union, Dict, List
from swarms.artifacts.main_artifact import Artifact
def handle_artifact_outputs(
file_path: str,
data: Union[str, Dict, List],
output_type: str = "txt",
folder_path: str = "./artifacts",
) -> str:
"""
Handle different types of data and create files in various formats.
Args:
file_path: Path where the file should be saved
data: Input data that can be string, dict or list
output_type: Type of output file (txt, md, pdf, csv, json)
folder_path: Folder to save artifacts
Returns:
str: Path to the created file
"""
# Create artifact with appropriate file type
artifact = Artifact(
folder_path=folder_path,
file_path=file_path,
file_type=output_type,
contents=data,
edit_count=0,
)
# Save the file
# artifact.save()
artifact.save_as(output_format=output_type)

@ -1,78 +0,0 @@
from loguru import logger
from typing import List, Union, Callable, Optional
from swarms.structs.agent import Agent
def reliability_check(
agents: List[Union[Agent, Callable]],
max_loops: int,
name: Optional[str] = None,
description: Optional[str] = None,
flow: Optional[str] = None,
) -> None:
"""
Performs reliability checks on swarm configuration parameters.
Args:
agents: List of Agent objects or callables that will be executed
max_loops: Maximum number of execution loops
name: Name identifier for the swarm
description: Description of the swarm's purpose
Raises:
ValueError: If any parameters fail validation checks
TypeError: If parameters are of incorrect type
"""
logger.info("Initializing swarm reliability checks")
# Type checking
if not isinstance(agents, list):
raise TypeError("agents parameter must be a list")
if not isinstance(max_loops, int):
raise TypeError("max_loops must be an integer")
# Validate agents
if not agents:
raise ValueError("Agents list cannot be empty")
for i, agent in enumerate(agents):
if not isinstance(agent, (Agent, Callable)):
raise TypeError(
f"Agent at index {i} must be an Agent instance or Callable"
)
# Validate max_loops
if max_loops <= 0:
raise ValueError("max_loops must be greater than 0")
if max_loops > 1000:
logger.warning(
"Large max_loops value detected. This may impact performance."
)
# Validate name
if name is None:
raise ValueError("name parameter is required")
if not isinstance(name, str):
raise TypeError("name must be a string")
if len(name.strip()) == 0:
raise ValueError("name cannot be empty or just whitespace")
# Validate description
if description is None:
raise ValueError("description parameter is required")
if not isinstance(description, str):
raise TypeError("description must be a string")
if len(description.strip()) == 0:
raise ValueError(
"description cannot be empty or just whitespace"
)
# Validate flow
if flow is None:
raise ValueError("flow parameter is required")
if not isinstance(flow, str):
raise TypeError("flow must be a string")
logger.info("All reliability checks passed successfully")

@ -1,77 +0,0 @@
import os
from typing import Any
from clusterops import (
execute_on_gpu,
execute_on_multiple_gpus,
execute_with_cpu_cores,
list_available_gpus,
)
from loguru import logger
def exec_callable_with_clusterops(
device: str = "cpu",
device_id: int = 0,
all_cores: bool = True,
all_gpus: bool = False,
func: callable = None,
*args,
**kwargs,
) -> Any:
"""
Executes a given function on a specified device, either CPU or GPU.
This method attempts to execute a given function on a specified device, either CPU or GPU. It logs the device selection and the number of cores or GPU ID used. If the device is set to CPU, it can use all available cores or a specific core specified by `device_id`. If the device is set to GPU, it uses the GPU specified by `device_id`.
Args:
device (str, optional): The device to use for execution. Defaults to "cpu".
device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0.
all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True.
all_gpus (bool, optional): If True, uses all available GPUs. Defaults to False.
func (callable): The function to execute.
*args: Additional positional arguments to be passed to the execution method.
**kwargs: Additional keyword arguments to be passed to the execution method.
Returns:
Any: The result of the execution.
Raises:
ValueError: If an invalid device is specified.
Exception: If any other error occurs during execution.
"""
try:
logger.info(f"Attempting to run on device: {device}")
if device == "cpu":
logger.info("Device set to CPU")
if all_cores is True:
count = os.cpu_count()
logger.info(f"Using all available CPU cores: {count}")
else:
count = device_id
logger.info(f"Using specific CPU core: {count}")
return execute_with_cpu_cores(
count, func, *args, **kwargs
)
# If device gpu
elif device == "gpu":
logger.info("Device set to GPU")
return execute_on_gpu(device_id, func, *args, **kwargs)
elif device == "gpu" and all_gpus is True:
logger.info("Device set to GPU and running all gpus")
gpus = [int(gpu) for gpu in list_available_gpus()]
return execute_on_multiple_gpus(
gpus, func, *args, **kwargs
)
else:
raise ValueError(
f"Invalid device specified: {device}. Supported devices are 'cpu' and 'gpu'."
)
except ValueError as e:
logger.error(f"Invalid device specified: {e}")
raise e
except Exception as e:
logger.error(f"An error occurred during execution: {e}")
raise e
Loading…
Cancel
Save