[KAIZEN][Delete retry_func] [delete profile_all func] [docs back to their place]

pull/903/head^2
Kye Gomez 1 month ago
parent 95001c1f4b
commit d6ef64eb4a

@ -40914,7 +40914,6 @@ Runs agents with system resource monitoring and adaptive batch sizing.
## Performance Considerations
- All functions are decorated with `@profile_func` for performance monitoring
- Default batch sizes and worker counts are optimized based on CPU cores
- Resource monitoring helps prevent system overload
- Using `uvloop` provides better performance than standard `asyncio`

@ -94,45 +94,149 @@ result = router.run(
### Real-time Dashboard
The ConcurrentWorkflow now includes a real-time dashboard feature that can be enabled by setting `show_dashboard=True`. This provides:
The ConcurrentWorkflow includes a powerful real-time dashboard feature that provides comprehensive monitoring and visualization of agent execution. Enable it by setting `show_dashboard=True` during workflow initialization.
- Live status of each agent's execution
- Progress tracking
- Real-time output visualization
- Task completion metrics
#### Dashboard Features
- **Live Status Tracking**: Real-time updates showing each agent's execution status
- **Progress Visualization**: Visual indicators of agent progress and completion
- **Output Streaming**: Live display of agent outputs as they're generated
- **Error Monitoring**: Immediate visibility into any agent failures or errors
- **Performance Metrics**: Execution time and completion statistics
- **Clean Display**: Automatic cleanup and formatting for optimal viewing
#### Dashboard Status Values
- **"pending"**: Agent is queued but not yet started
- **"running"**: Agent is currently executing the task
- **"completed"**: Agent finished successfully with output
- **"error"**: Agent execution failed with error details
#### Dashboard Configuration
```python
# Enable dashboard with custom configuration
workflow = ConcurrentWorkflow(
name="my-workflow",
agents=agents,
show_dashboard=True, # Enable real-time monitoring
output_type="dict", # Configure output format
auto_save=True, # Auto-save conversation history
)
```
#### Dashboard Behavior
When `show_dashboard=True`:
- Individual agent print outputs are automatically disabled to prevent conflicts
- Dashboard updates every 100ms for smooth real-time streaming
- Initial dashboard shows all agents as "pending"
- Real-time updates show status changes and output previews
- Final dashboard displays complete results summary
- Automatic cleanup of dashboard resources after completion
### Concurrent Execution
- Multiple agents work simultaneously
- Efficient resource utilization
- Automatic task distribution
- Built-in thread management
- **ThreadPoolExecutor**: Uses 95% of available CPU cores for optimal performance
- **True Parallelism**: Agents execute simultaneously, not sequentially
- **Thread Safety**: Safe concurrent access to shared resources
- **Error Isolation**: Individual agent failures don't affect others
- **Resource Management**: Automatic thread lifecycle management
### Output Formatting Options
The workflow supports multiple output aggregation formats:
- **"dict-all-except-first"**: Dictionary with all agent outputs except the first (default)
- **"dict"**: Complete dictionary with all agent outputs keyed by agent name
- **"str"**: Concatenated string of all agent outputs
- **"list"**: List of individual agent outputs in completion order
```python
# Configure output format
workflow = ConcurrentWorkflow(
agents=agents,
output_type="dict", # Get complete dictionary of results
show_dashboard=True
)
```
### Advanced Features
#### Auto Prompt Engineering
Enable automatic prompt optimization for all agents:
```python
workflow = ConcurrentWorkflow(
agents=agents,
auto_generate_prompts=True, # Enable automatic prompt engineering
show_dashboard=True
)
```
#### Conversation History Management
Automatic conversation tracking and persistence:
```python
workflow = ConcurrentWorkflow(
agents=agents,
auto_save=True, # Auto-save conversation history
metadata_output_path="results.json" # Custom output file path
)
```
#### Multimodal Support
Support for image inputs across all agents:
```python
# Single image input
result = workflow.run(
task="Analyze this chart",
img="financial_chart.png"
)
# Multiple image inputs
result = workflow.run(
task="Compare these charts",
imgs=["chart1.png", "chart2.png", "chart3.png"]
)
```
## Best Practices
1. Task Distribution:
- Break down complex tasks into independent subtasks
- Assign appropriate agents to each subtask
- Ensure tasks can be processed concurrently
### 1. Dashboard Usage
- **Development & Debugging**: Use dashboard for real-time monitoring during development
- **Production**: Consider disabling dashboard for headless execution in production
- **Performance**: Dashboard adds minimal overhead but provides valuable insights
- **Error Handling**: Dashboard immediately shows which agents fail and why
### 2. Agent Configuration
- **Specialization**: Use specialized agents for specific tasks
- **Model Selection**: Choose appropriate models for each agent's role
- **Temperature**: Configure temperature based on task requirements
- **System Prompts**: Write clear, specific system prompts for each agent
2. Agent Configuration:
- Use specialized agents for specific tasks
- Configure appropriate model parameters
- Set meaningful system prompts
### 3. Resource Management
3. Resource Management:
- Monitor concurrent execution through the dashboard
- Handle rate limits appropriately
- Manage memory usage
- **CPU Utilization**: Workflow automatically uses 95% of available cores
- **Memory**: Monitor conversation history growth in long-running workflows
- **Rate Limits**: Handle API rate limits appropriately for your LLM provider
- **Error Recovery**: Implement fallback mechanisms for failed agents
4. Error Handling:
- Implement proper error handling
- Log errors and exceptions
- Provide fallback mechanisms
### 4. Task Design
## Example Implementation
- **Independence**: Ensure tasks can be processed concurrently without dependencies
- **Granularity**: Break complex tasks into independent subtasks
- **Balance**: Distribute work evenly across agents for optimal performance
Here's a complete example showing how to use ConcurrentWorkflow for a comprehensive market analysis:
## Example Implementations
### Comprehensive Market Analysis
```python
from swarms import Agent
@ -184,6 +288,8 @@ workflow = ConcurrentWorkflow(
agents=[market_analyst, financial_analyst, risk_analyst],
max_loops=1,
show_dashboard=True, # Enable real-time monitoring
output_type="dict", # Get structured results
auto_save=True, # Save conversation history
)
try:
@ -197,13 +303,102 @@ try:
# Process and display results
print("\nAnalysis Results:")
print("=" * 50)
for agent_output in result:
print(f"\nAnalysis from {agent_output['agent']}:")
for agent_name, output in result.items():
print(f"\nAnalysis from {agent_name}:")
print("-" * 40)
print(agent_output['output'])
print(output)
except Exception as e:
print(f"Error during analysis: {str(e)}")
```
This guide demonstrates how to effectively use the ConcurrentWorkflow architecture with its new dashboard feature for parallel processing of complex tasks using multiple specialized agents.
### Batch Processing with Dashboard
```python
# Process multiple tasks sequentially with concurrent agent execution
tasks = [
"Analyze Q1 financial performance and market position",
"Analyze Q2 financial performance and market position",
"Analyze Q3 financial performance and market position",
"Analyze Q4 financial performance and market position"
]
# Optional: corresponding images for each task
charts = ["q1_chart.png", "q2_chart.png", "q3_chart.png", "q4_chart.png"]
# Batch processing with dashboard monitoring
results = workflow.batch_run(tasks, imgs=charts)
print(f"Completed {len(results)} quarterly analyses")
for i, result in enumerate(results):
print(f"\nQ{i+1} Analysis Results:")
print(result)
```
### Multimodal Analysis
```python
# Analyze financial charts with multiple specialized agents
workflow = ConcurrentWorkflow(
agents=[technical_analyst, fundamental_analyst, sentiment_analyst],
show_dashboard=True,
output_type="dict"
)
# Analyze a single chart
result = workflow.run(
task="Analyze this stock chart and provide trading insights",
img="stock_chart.png"
)
# Analyze multiple charts
result = workflow.run(
task="Compare these three charts and identify patterns",
imgs=["chart1.png", "chart2.png", "chart3.png"]
)
```
### Error Handling and Monitoring
```python
# Workflow with comprehensive error handling
workflow = ConcurrentWorkflow(
agents=agents,
show_dashboard=True, # Monitor execution in real-time
auto_save=True, # Preserve results even if errors occur
output_type="dict" # Get structured results for easier processing
)
try:
result = workflow.run("Complex analysis task")
# Check for errors in results
for agent_name, output in result.items():
if output.startswith("Error:"):
print(f"Agent {agent_name} failed: {output}")
else:
print(f"Agent {agent_name} completed successfully")
except Exception as e:
print(f"Workflow execution failed: {str(e)}")
# Results may still be available for successful agents
```
## Performance Tips
1. **Agent Count**: Use 2+ agents to benefit from concurrent execution
2. **CPU Utilization**: Workflow automatically optimizes for available cores
3. **Dashboard Overhead**: Minimal performance impact for valuable monitoring
4. **Memory Management**: Clear conversation history for very large batch jobs
5. **Error Recovery**: Failed agents don't stop successful ones
## Use Cases
- **Multi-perspective Analysis**: Financial, legal, technical reviews
- **Consensus Building**: Voting systems and decision making
- **Parallel Processing**: Data analysis and batch operations
- **A/B Testing**: Different agent configurations and strategies
- **Redundancy**: Reliability improvements through multiple agents
- **Real-time Monitoring**: Development and debugging workflows
This guide demonstrates how to effectively use the ConcurrentWorkflow architecture with its advanced dashboard feature for parallel processing of complex tasks using multiple specialized agents.

@ -160,7 +160,6 @@ Runs agents with system resource monitoring and adaptive batch sizing.
## Performance Considerations
- All functions are decorated with `@profile_func` for performance monitoring
- Default batch sizes and worker counts are optimized based on CPU cores
- Resource monitoring helps prevent system overload
- Using `uvloop` provides better performance than standard `asyncio`

@ -33,14 +33,13 @@ agent = Agent(
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
model_name="claude-3-sonnet-20240229",
model_name="gemini-2.5-flash",
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
streaming_on=True,
streaming_on=False,
max_loops="auto",
print_on=True,
telemetry_enable=False,
# event_listeners=[],
interactive=True,
no_reasoning_prompt=True,
# dashboard=True
)

@ -0,0 +1,14 @@
from swarms.structs.heavy_swarm import HeavySwarm
swarm = HeavySwarm(
worker_model_name="claude-3-5-sonnet-20240620",
show_dashboard=True,
question_agent_model_name="gpt-4.1",
loops_per_agent=1,
)
out = swarm.run(
"List the top 5 gold and commodity ETFs with the best performance and lowest expense ratios. For each ETF, provide the ticker symbol, full name, current price, 1-year and 5-year returns (in %), and the expense ratio. Also, specify which major brokerages (e.g., Fidelity, Schwab, Vanguard, E*TRADE) offer these ETFs for purchase. Present your findings in a clear, structured table."
)
print(out)

@ -305,7 +305,7 @@ class Agent:
id: Optional[str] = agent_id(),
llm: Optional[Any] = None,
template: Optional[str] = None,
max_loops: Optional[int] = 1,
max_loops: Optional[Union[int, str]] = 1,
stopping_condition: Optional[Callable[[str], bool]] = None,
loop_interval: Optional[int] = 0,
retry_attempts: Optional[int] = 3,
@ -433,6 +433,7 @@ class Agent:
summarize_multiple_images: bool = False,
tool_retry_attempts: int = 3,
speed_mode: str = None,
reasoning_prompt_on: bool = True,
*args,
**kwargs,
):
@ -574,6 +575,7 @@ class Agent:
self.summarize_multiple_images = summarize_multiple_images
self.tool_retry_attempts = tool_retry_attempts
self.speed_mode = speed_mode
self.reasoning_prompt_on = reasoning_prompt_on
# Initialize the feedback
self.feedback = []
@ -592,7 +594,13 @@ class Agent:
if exists(self.sop) or exists(self.sop_list):
self.handle_sop_ops()
if self.max_loops >= 2:
if self.interactive is True:
self.reasoning_prompt_on = False
if self.reasoning_prompt_on is True and (
(isinstance(self.max_loops, int) and self.max_loops >= 2)
or self.max_loops == "auto"
):
self.system_prompt += generate_reasoning_prompt(
self.max_loops
)
@ -1045,18 +1053,27 @@ class Agent:
):
loop_count += 1
if self.max_loops >= 2:
self.short_memory.add(
role=self.agent_name,
content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}",
)
if (
isinstance(self.max_loops, int)
and self.max_loops >= 2
):
if self.reasoning_prompt_on is True:
self.short_memory.add(
role=self.agent_name,
content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}",
)
# If it is the final loop, then add the final loop message
if loop_count >= 2 and loop_count == self.max_loops:
self.short_memory.add(
role=self.agent_name,
content=f"🎉 Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.",
)
if (
loop_count >= 2
and isinstance(self.max_loops, int)
and loop_count == self.max_loops
):
if self.reasoning_prompt_on is True:
self.short_memory.add(
role=self.agent_name,
content=f"🎉 Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.",
)
# Dynamic temperature
if self.dynamic_temperature_enabled is True:
@ -1191,6 +1208,7 @@ class Agent:
break
if self.interactive:
# logger.info("Interactive mode enabled.")
user_input = input("You: ")
@ -1943,7 +1961,7 @@ class Agent:
"""Upddate the system message"""
self.system_prompt = system_prompt
def update_max_loops(self, max_loops: int):
def update_max_loops(self, max_loops: Union[int, str]):
"""Update the max loops"""
self.max_loops = max_loops

@ -6,7 +6,7 @@ from typing import Any, Callable, List, Optional, Union
import schedule
from loguru import logger
from swarms import Agent
# from swarms import Agent
class CronJobError(Exception):
@ -49,7 +49,7 @@ class CronJob:
def __init__(
self,
agent: Optional[Union[Agent, Callable]] = None,
agent: Optional[Union[Any, Callable]] = None,
interval: Optional[str] = None,
job_id: Optional[str] = None,
):
@ -238,7 +238,7 @@ class CronJob:
"""
try:
logger.debug(f"Executing task for job {self.job_id}")
if isinstance(self.agent, Agent):
if isinstance(self.agent, Callable):
return self.agent.run(task=task, **kwargs)
else:
return self.agent(task, **kwargs)

@ -14,7 +14,6 @@ from swarms.utils.file_processing import (
from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.try_except_wrapper import try_except_wrapper
from swarms.utils.calculate_func_metrics import profile_func
from swarms.utils.litellm_tokenizer import count_tokens
from swarms.utils.output_types import HistoryOutputType
from swarms.utils.history_output_formatter import (
@ -38,7 +37,6 @@ __all__ = [
"extract_code_from_markdown",
"pdf_to_text",
"try_except_wrapper",
"profile_func",
"count_tokens",
"HistoryOutputType",
"history_output_formatter",

@ -1,171 +0,0 @@
import time
import tracemalloc
from functools import wraps
from typing import Any, Callable
import psutil
from pydantic import BaseModel
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="calculate_func_metrics")
class FunctionMetrics(BaseModel):
execution_time: float
memory_usage: float
cpu_usage: float
io_operations: int
function_calls: int
def profile_func(func):
"""
Decorator function that profiles the execution of a given function.
Args:
func: The function to be profiled.
Returns:
A wrapper function that profiles the execution of the given function and returns the result along with the metrics.
"""
def wrapper(*args, **kwargs):
# Record the initial time, memory usage, CPU usage, and I/O operations
start_time = time.time()
start_mem = psutil.Process().memory_info().rss
start_cpu = psutil.cpu_percent()
start_io = (
psutil.disk_io_counters().read_count
+ psutil.disk_io_counters().write_count
)
# Call the function
result = func(*args, **kwargs)
# Record the final time, memory usage, CPU usage, and I/O operations
end_time = time.time()
end_mem = psutil.Process().memory_info().rss
end_cpu = psutil.cpu_percent()
end_io = (
psutil.disk_io_counters().read_count
+ psutil.disk_io_counters().write_count
)
# Calculate the execution time, memory usage, CPU usage, and I/O operations
execution_time = end_time - start_time
memory_usage = (end_mem - start_mem) / (
1024**2
) # Convert bytes to MiB
cpu_usage = end_cpu - start_cpu
io_operations = end_io - start_io
# Return the metrics as a FunctionMetrics object
metrics = FunctionMetrics(
execution_time=execution_time,
memory_usage=memory_usage,
cpu_usage=cpu_usage,
io_operations=io_operations,
function_calls=1, # Each call to the function counts as one function call
)
json_data = metrics.model_dump_json(indent=4)
logger.info(f"Function metrics: {json_data}")
return result, metrics
return wrapper
def profile_all(func: Callable) -> Callable:
"""
A decorator to profile memory usage, CPU usage, and I/O operations
of a function and log the data using loguru.
It combines tracemalloc for memory profiling, psutil for CPU and I/O operations,
and measures execution time.
Args:
func (Callable): The function to be profiled.
Returns:
Callable: The wrapped function with profiling enabled.
"""
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# Start memory tracking
tracemalloc.start()
# Get initial CPU stats
process = psutil.Process()
initial_cpu_times = process.cpu_times()
# Get initial I/O stats if available
try:
initial_io_counters = process.io_counters()
io_tracking_available = True
except AttributeError:
logger.warning(
"I/O counters not available on this platform."
)
io_tracking_available = False
# Start timing the function execution
start_time = time.time()
# Execute the function
result = func(*args, **kwargs)
# Stop timing
end_time = time.time()
execution_time = end_time - start_time
# Get final CPU stats
final_cpu_times = process.cpu_times()
# Get final I/O stats if available
if io_tracking_available:
final_io_counters = process.io_counters()
io_read_count = (
final_io_counters.read_count
- initial_io_counters.read_count
)
io_write_count = (
final_io_counters.write_count
- initial_io_counters.write_count
)
else:
io_read_count = io_write_count = 0
# Get memory usage statistics
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics("lineno")
# Calculate CPU usage
cpu_usage = (
final_cpu_times.user
- initial_cpu_times.user
+ final_cpu_times.system
- initial_cpu_times.system
)
# Log the data
logger.info(f"Execution time: {execution_time:.4f} seconds")
logger.info(f"CPU usage: {cpu_usage:.2f} seconds")
if io_tracking_available:
logger.info(
f"I/O Operations - Read: {io_read_count}, Write: {io_write_count}"
)
logger.info("Top memory usage:")
for stat in top_stats[:10]:
logger.info(stat)
# Stop memory tracking
tracemalloc.stop()
return result
return wrapper

@ -1,66 +0,0 @@
import time
from typing import Any, Callable, Type, Union, Tuple
from loguru import logger
def retry_function(
func: Callable,
*args: Any,
max_retries: int = 3,
delay: float = 1.0,
backoff_factor: float = 2.0,
exceptions: Union[
Type[Exception], Tuple[Type[Exception], ...]
] = Exception,
**kwargs: Any,
) -> Any:
"""
A function that retries another function if it raises specified exceptions.
Args:
func (Callable): The function to retry
*args: Positional arguments to pass to the function
max_retries (int): Maximum number of retries before giving up. Defaults to 3.
delay (float): Initial delay between retries in seconds. Defaults to 1.0.
backoff_factor (float): Multiplier applied to delay between retries. Defaults to 2.0.
exceptions (Exception or tuple): Exception(s) that trigger a retry. Defaults to Exception.
**kwargs: Keyword arguments to pass to the function
Returns:
Any: The return value of the function if successful
Example:
def fetch_data(url: str) -> dict:
return requests.get(url).json()
# Retry the fetch_data function
result = retry_function(
fetch_data,
"https://api.example.com",
max_retries=3,
exceptions=(ConnectionError, TimeoutError)
)
"""
retries = 0
current_delay = delay
while True:
try:
return func(*args, **kwargs)
except exceptions as e:
retries += 1
if retries > max_retries:
logger.error(
f"Function {func.__name__} failed after {max_retries} retries. "
f"Final error: {str(e)}"
)
raise
logger.warning(
f"Retry {retries}/{max_retries} for function {func.__name__} "
f"after error: {str(e)}. "
f"Waiting {current_delay} seconds..."
)
time.sleep(current_delay)
current_delay *= backoff_factor

@ -0,0 +1,428 @@
"""
Simple workspace management functions for creating files and folders.
Raw utility functions for easy file and folder creation operations.
"""
import json
import yaml
from pathlib import Path
from typing import Optional, Dict, Any
def create_folder(
folder_name: str, parent_path: Optional[str] = None
) -> Path:
"""
Create a new folder.
Args:
folder_name: Name of the folder to create
parent_path: Parent directory path. If None, creates in current directory.
Returns:
Path object of the created folder
"""
if parent_path:
folder_path = Path(parent_path) / folder_name
else:
folder_path = Path(folder_name)
folder_path.mkdir(parents=True, exist_ok=True)
return folder_path
def file_exists(
file_name: str, parent_path: Optional[str] = None
) -> bool:
"""
Check if a file exists.
Args:
file_name: Name of the file to check
parent_path: Parent directory path. If None, checks in current directory.
Returns:
True if file exists, False otherwise
"""
if parent_path:
file_path = Path(parent_path) / file_name
else:
file_path = Path(file_name)
return file_path.exists() and file_path.is_file()
def update_file(
file_name: str, content: str, parent_path: Optional[str] = None
) -> Path:
"""
Update an existing file with new content.
Args:
file_name: Name of the file to update
content: New content to write to the file
parent_path: Parent directory path. If None, updates in current directory.
Returns:
Path object of the updated file
Raises:
FileNotFoundError: If file doesn't exist
"""
if parent_path:
file_path = Path(parent_path) / file_name
else:
file_path = Path(file_name)
if not file_path.exists():
raise FileNotFoundError(f"File {file_path} does not exist")
file_path.write_text(content, encoding="utf-8")
return file_path
def create_or_update_file(
file_name: str,
content: str = "",
parent_path: Optional[str] = None,
) -> Path:
"""
Create a new file or update existing file with content.
Args:
file_name: Name of the file to create or update
content: Content to write to the file
parent_path: Parent directory path. If None, creates/updates in current directory.
Returns:
Path object of the created or updated file
"""
if parent_path:
file_path = Path(parent_path) / file_name
else:
file_path = Path(file_name)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content, encoding="utf-8")
return file_path
def create_file(
file_name: str,
content: str = "",
parent_path: Optional[str] = None,
) -> Path:
"""
Create a new file with content.
Args:
file_name: Name of the file to create
content: Content to write to the file
parent_path: Parent directory path. If None, creates in current directory.
Returns:
Path object of the created file
"""
if parent_path:
file_path = Path(parent_path) / file_name
else:
file_path = Path(file_name)
if file_path.exists():
raise FileExistsError(
f"File {file_path} already exists. Use create_or_update_file() to update existing files."
)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content, encoding="utf-8")
return file_path
def create_python_file(
file_name: str,
content: str = "",
parent_path: Optional[str] = None,
) -> Path:
"""
Create a Python file with content.
Args:
file_name: Name of the file (with or without .py extension)
content: Python code content
parent_path: Parent directory path
Returns:
Path object of the created Python file
"""
if not file_name.endswith(".py"):
file_name += ".py"
return create_file(file_name, content, parent_path)
def create_or_update_python_file(
file_name: str,
content: str = "",
parent_path: Optional[str] = None,
) -> Path:
"""
Create a Python file or update existing Python file with content.
Args:
file_name: Name of the file (with or without .py extension)
content: Python code content
parent_path: Parent directory path
Returns:
Path object of the created or updated Python file
"""
if not file_name.endswith(".py"):
file_name += ".py"
return create_or_update_file(file_name, content, parent_path)
def create_json_file(
file_name: str,
data: Dict[str, Any],
parent_path: Optional[str] = None,
) -> Path:
"""
Create a JSON file with data.
Args:
file_name: Name of the file (with or without .json extension)
data: Dictionary data to serialize to JSON
parent_path: Parent directory path
Returns:
Path object of the created JSON file
"""
if not file_name.endswith(".json"):
file_name += ".json"
content = json.dumps(data, indent=2, ensure_ascii=False)
return create_file(file_name, content, parent_path)
def create_or_update_json_file(
file_name: str,
data: Dict[str, Any],
parent_path: Optional[str] = None,
) -> Path:
"""
Create a JSON file or update existing JSON file with data.
Args:
file_name: Name of the file (with or without .json extension)
data: Dictionary data to serialize to JSON
parent_path: Parent directory path
Returns:
Path object of the created or updated JSON file
"""
if not file_name.endswith(".json"):
file_name += ".json"
content = json.dumps(data, indent=2, ensure_ascii=False)
return create_or_update_file(file_name, content, parent_path)
def create_yaml_file(
file_name: str,
data: Dict[str, Any],
parent_path: Optional[str] = None,
) -> Path:
"""
Create a YAML file with data.
Args:
file_name: Name of the file (with or without .yaml/.yml extension)
data: Dictionary data to serialize to YAML
parent_path: Parent directory path
Returns:
Path object of the created YAML file
"""
if not (
file_name.endswith(".yaml") or file_name.endswith(".yml")
):
file_name += ".yaml"
content = yaml.dump(
data, default_flow_style=False, allow_unicode=True
)
return create_file(file_name, content, parent_path)
def create_or_update_yaml_file(
file_name: str,
data: Dict[str, Any],
parent_path: Optional[str] = None,
) -> Path:
"""
Create a YAML file or update existing YAML file with data.
Args:
file_name: Name of the file (with or without .yaml/.yml extension)
data: Dictionary data to serialize to YAML
parent_path: Parent directory path
Returns:
Path object of the created or updated YAML file
"""
if not (
file_name.endswith(".yaml") or file_name.endswith(".yml")
):
file_name += ".yaml"
content = yaml.dump(
data, default_flow_style=False, allow_unicode=True
)
return create_or_update_file(file_name, content, parent_path)
def create_markdown_file(
file_name: str,
content: str = "",
parent_path: Optional[str] = None,
) -> Path:
"""
Create a Markdown file with content.
Args:
file_name: Name of the file (with or without .md extension)
content: Markdown content
parent_path: Parent directory path
Returns:
Path object of the created Markdown file
"""
if not file_name.endswith(".md"):
file_name += ".md"
return create_file(file_name, content, parent_path)
def create_or_update_markdown_file(
file_name: str,
content: str = "",
parent_path: Optional[str] = None,
) -> Path:
"""
Create a Markdown file or update existing Markdown file with content.
Args:
file_name: Name of the file (with or without .md extension)
content: Markdown content
parent_path: Parent directory path
Returns:
Path object of the created or updated Markdown file
"""
if not file_name.endswith(".md"):
file_name += ".md"
return create_or_update_file(file_name, content, parent_path)
def create_text_file(
file_name: str,
content: str = "",
parent_path: Optional[str] = None,
) -> Path:
"""
Create a text file with content.
Args:
file_name: Name of the file (with or without .txt extension)
content: Text content
parent_path: Parent directory path
Returns:
Path object of the created text file
"""
if not file_name.endswith(".txt"):
file_name += ".txt"
return create_file(file_name, content, parent_path)
def create_or_update_text_file(
file_name: str,
content: str = "",
parent_path: Optional[str] = None,
) -> Path:
"""
Create a text file or update existing text file with content.
Args:
file_name: Name of the file (with or without .txt extension)
content: Text content
parent_path: Parent directory path
Returns:
Path object of the created or updated text file
"""
if not file_name.endswith(".txt"):
file_name += ".txt"
return create_or_update_file(file_name, content, parent_path)
def create_empty_file(
file_name: str, parent_path: Optional[str] = None
) -> Path:
"""
Create an empty file.
Args:
file_name: Name of the file
parent_path: Parent directory path
Returns:
Path object of the created empty file
"""
return create_file(file_name, "", parent_path)
def create_project_structure(
structure: Dict[str, Any], parent_path: Optional[str] = None
) -> Dict[str, Path]:
"""
Create a nested project structure from a dictionary.
Args:
structure: Dictionary defining the project structure
parent_path: Parent directory path
Returns:
Dictionary mapping structure keys to created Path objects
Example:
structure = {
"src": {
"main.py": "print('Hello World')",
"utils": {
"__init__.py": "",
"helper.py": "def helper(): pass"
}
},
"tests": {
"test_main.py": "import unittest"
},
"README.md": "# My Project"
}
"""
created_paths = {}
base_path = Path(parent_path) if parent_path else Path.cwd()
def _create_structure(structure_dict, current_path):
for key, value in structure_dict.items():
item_path = current_path / key
if isinstance(value, dict):
# It's a folder
item_path.mkdir(parents=True, exist_ok=True)
created_paths[key] = item_path
_create_structure(value, item_path)
else:
# It's a file
content = str(value) if value is not None else ""
item_path.parent.mkdir(parents=True, exist_ok=True)
item_path.write_text(content, encoding="utf-8")
created_paths[key] = item_path
_create_structure(structure, base_path)
return created_paths
Loading…
Cancel
Save