From d6ef64eb4afb42e7d3cece7481560a5d0b508e19 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Tue, 29 Jul 2025 11:06:06 -0700 Subject: [PATCH] [KAIZEN][Delete retry_func] [delete profile_all func] [docs back to their place] --- docs/llm.txt | 1 - docs/swarms/examples/concurrent_workflow.md | 257 +++++++++-- .../structs/various_execution_methods.md | 1 - example.py | 9 +- .../graph/graph_workflow_basic.py | 0 .../graph/graph_workflow_example.py | 0 heavy_swarm_example.py | 14 + swarms/structs/agent.py | 44 +- swarms/structs/cron_job.py | 6 +- swarms/utils/__init__.py | 2 - swarms/utils/calculate_func_metrics.py | 171 ------- swarms/utils/retry_func.py | 66 --- swarms/utils/workspace_manager.py | 428 ++++++++++++++++++ .../test_conversation.py | 0 14 files changed, 706 insertions(+), 293 deletions(-) rename examples/multi_agent/{ => graphworkflow_examples}/graph/graph_workflow_basic.py (100%) rename graph_workflow_example.py => examples/multi_agent/graphworkflow_examples/graph/graph_workflow_example.py (100%) create mode 100644 heavy_swarm_example.py delete mode 100644 swarms/utils/calculate_func_metrics.py delete mode 100644 swarms/utils/retry_func.py create mode 100644 swarms/utils/workspace_manager.py rename test_conversation.py => tests/test_conversation.py (100%) diff --git a/docs/llm.txt b/docs/llm.txt index 4a9ce385..9f1d2fb6 100644 --- a/docs/llm.txt +++ b/docs/llm.txt @@ -40914,7 +40914,6 @@ Runs agents with system resource monitoring and adaptive batch sizing. ## Performance Considerations -- All functions are decorated with `@profile_func` for performance monitoring - Default batch sizes and worker counts are optimized based on CPU cores - Resource monitoring helps prevent system overload - Using `uvloop` provides better performance than standard `asyncio` diff --git a/docs/swarms/examples/concurrent_workflow.md b/docs/swarms/examples/concurrent_workflow.md index da5b4763..536ae432 100644 --- a/docs/swarms/examples/concurrent_workflow.md +++ b/docs/swarms/examples/concurrent_workflow.md @@ -94,45 +94,149 @@ result = router.run( ### Real-time Dashboard -The ConcurrentWorkflow now includes a real-time dashboard feature that can be enabled by setting `show_dashboard=True`. This provides: +The ConcurrentWorkflow includes a powerful real-time dashboard feature that provides comprehensive monitoring and visualization of agent execution. Enable it by setting `show_dashboard=True` during workflow initialization. -- Live status of each agent's execution -- Progress tracking -- Real-time output visualization -- Task completion metrics +#### Dashboard Features + +- **Live Status Tracking**: Real-time updates showing each agent's execution status +- **Progress Visualization**: Visual indicators of agent progress and completion +- **Output Streaming**: Live display of agent outputs as they're generated +- **Error Monitoring**: Immediate visibility into any agent failures or errors +- **Performance Metrics**: Execution time and completion statistics +- **Clean Display**: Automatic cleanup and formatting for optimal viewing + +#### Dashboard Status Values + +- **"pending"**: Agent is queued but not yet started +- **"running"**: Agent is currently executing the task +- **"completed"**: Agent finished successfully with output +- **"error"**: Agent execution failed with error details + +#### Dashboard Configuration + +```python +# Enable dashboard with custom configuration +workflow = ConcurrentWorkflow( + name="my-workflow", + agents=agents, + show_dashboard=True, # Enable real-time monitoring + output_type="dict", # Configure output format + auto_save=True, # Auto-save conversation history +) +``` + +#### Dashboard Behavior + +When `show_dashboard=True`: +- Individual agent print outputs are automatically disabled to prevent conflicts +- Dashboard updates every 100ms for smooth real-time streaming +- Initial dashboard shows all agents as "pending" +- Real-time updates show status changes and output previews +- Final dashboard displays complete results summary +- Automatic cleanup of dashboard resources after completion ### Concurrent Execution -- Multiple agents work simultaneously -- Efficient resource utilization -- Automatic task distribution -- Built-in thread management +- **ThreadPoolExecutor**: Uses 95% of available CPU cores for optimal performance +- **True Parallelism**: Agents execute simultaneously, not sequentially +- **Thread Safety**: Safe concurrent access to shared resources +- **Error Isolation**: Individual agent failures don't affect others +- **Resource Management**: Automatic thread lifecycle management + +### Output Formatting Options + +The workflow supports multiple output aggregation formats: + +- **"dict-all-except-first"**: Dictionary with all agent outputs except the first (default) +- **"dict"**: Complete dictionary with all agent outputs keyed by agent name +- **"str"**: Concatenated string of all agent outputs +- **"list"**: List of individual agent outputs in completion order + +```python +# Configure output format +workflow = ConcurrentWorkflow( + agents=agents, + output_type="dict", # Get complete dictionary of results + show_dashboard=True +) +``` + +### Advanced Features + +#### Auto Prompt Engineering + +Enable automatic prompt optimization for all agents: + +```python +workflow = ConcurrentWorkflow( + agents=agents, + auto_generate_prompts=True, # Enable automatic prompt engineering + show_dashboard=True +) +``` + +#### Conversation History Management + +Automatic conversation tracking and persistence: + +```python +workflow = ConcurrentWorkflow( + agents=agents, + auto_save=True, # Auto-save conversation history + metadata_output_path="results.json" # Custom output file path +) +``` + +#### Multimodal Support + +Support for image inputs across all agents: + +```python +# Single image input +result = workflow.run( + task="Analyze this chart", + img="financial_chart.png" +) + +# Multiple image inputs +result = workflow.run( + task="Compare these charts", + imgs=["chart1.png", "chart2.png", "chart3.png"] +) +``` ## Best Practices -1. Task Distribution: - - Break down complex tasks into independent subtasks - - Assign appropriate agents to each subtask - - Ensure tasks can be processed concurrently +### 1. Dashboard Usage + +- **Development & Debugging**: Use dashboard for real-time monitoring during development +- **Production**: Consider disabling dashboard for headless execution in production +- **Performance**: Dashboard adds minimal overhead but provides valuable insights +- **Error Handling**: Dashboard immediately shows which agents fail and why + +### 2. Agent Configuration + +- **Specialization**: Use specialized agents for specific tasks +- **Model Selection**: Choose appropriate models for each agent's role +- **Temperature**: Configure temperature based on task requirements +- **System Prompts**: Write clear, specific system prompts for each agent -2. Agent Configuration: - - Use specialized agents for specific tasks - - Configure appropriate model parameters - - Set meaningful system prompts +### 3. Resource Management -3. Resource Management: - - Monitor concurrent execution through the dashboard - - Handle rate limits appropriately - - Manage memory usage +- **CPU Utilization**: Workflow automatically uses 95% of available cores +- **Memory**: Monitor conversation history growth in long-running workflows +- **Rate Limits**: Handle API rate limits appropriately for your LLM provider +- **Error Recovery**: Implement fallback mechanisms for failed agents -4. Error Handling: - - Implement proper error handling - - Log errors and exceptions - - Provide fallback mechanisms +### 4. Task Design -## Example Implementation +- **Independence**: Ensure tasks can be processed concurrently without dependencies +- **Granularity**: Break complex tasks into independent subtasks +- **Balance**: Distribute work evenly across agents for optimal performance -Here's a complete example showing how to use ConcurrentWorkflow for a comprehensive market analysis: +## Example Implementations + +### Comprehensive Market Analysis ```python from swarms import Agent @@ -184,6 +288,8 @@ workflow = ConcurrentWorkflow( agents=[market_analyst, financial_analyst, risk_analyst], max_loops=1, show_dashboard=True, # Enable real-time monitoring + output_type="dict", # Get structured results + auto_save=True, # Save conversation history ) try: @@ -197,13 +303,102 @@ try: # Process and display results print("\nAnalysis Results:") print("=" * 50) - for agent_output in result: - print(f"\nAnalysis from {agent_output['agent']}:") + for agent_name, output in result.items(): + print(f"\nAnalysis from {agent_name}:") print("-" * 40) - print(agent_output['output']) + print(output) except Exception as e: print(f"Error during analysis: {str(e)}") ``` -This guide demonstrates how to effectively use the ConcurrentWorkflow architecture with its new dashboard feature for parallel processing of complex tasks using multiple specialized agents. \ No newline at end of file +### Batch Processing with Dashboard + +```python +# Process multiple tasks sequentially with concurrent agent execution +tasks = [ + "Analyze Q1 financial performance and market position", + "Analyze Q2 financial performance and market position", + "Analyze Q3 financial performance and market position", + "Analyze Q4 financial performance and market position" +] + +# Optional: corresponding images for each task +charts = ["q1_chart.png", "q2_chart.png", "q3_chart.png", "q4_chart.png"] + +# Batch processing with dashboard monitoring +results = workflow.batch_run(tasks, imgs=charts) + +print(f"Completed {len(results)} quarterly analyses") +for i, result in enumerate(results): + print(f"\nQ{i+1} Analysis Results:") + print(result) +``` + +### Multimodal Analysis + +```python +# Analyze financial charts with multiple specialized agents +workflow = ConcurrentWorkflow( + agents=[technical_analyst, fundamental_analyst, sentiment_analyst], + show_dashboard=True, + output_type="dict" +) + +# Analyze a single chart +result = workflow.run( + task="Analyze this stock chart and provide trading insights", + img="stock_chart.png" +) + +# Analyze multiple charts +result = workflow.run( + task="Compare these three charts and identify patterns", + imgs=["chart1.png", "chart2.png", "chart3.png"] +) +``` + +### Error Handling and Monitoring + +```python +# Workflow with comprehensive error handling +workflow = ConcurrentWorkflow( + agents=agents, + show_dashboard=True, # Monitor execution in real-time + auto_save=True, # Preserve results even if errors occur + output_type="dict" # Get structured results for easier processing +) + +try: + result = workflow.run("Complex analysis task") + + # Check for errors in results + for agent_name, output in result.items(): + if output.startswith("Error:"): + print(f"Agent {agent_name} failed: {output}") + else: + print(f"Agent {agent_name} completed successfully") + +except Exception as e: + print(f"Workflow execution failed: {str(e)}") + # Results may still be available for successful agents +``` + +## Performance Tips + +1. **Agent Count**: Use 2+ agents to benefit from concurrent execution +2. **CPU Utilization**: Workflow automatically optimizes for available cores +3. **Dashboard Overhead**: Minimal performance impact for valuable monitoring +4. **Memory Management**: Clear conversation history for very large batch jobs +5. **Error Recovery**: Failed agents don't stop successful ones + +## Use Cases + +- **Multi-perspective Analysis**: Financial, legal, technical reviews +- **Consensus Building**: Voting systems and decision making +- **Parallel Processing**: Data analysis and batch operations +- **A/B Testing**: Different agent configurations and strategies +- **Redundancy**: Reliability improvements through multiple agents +- **Real-time Monitoring**: Development and debugging workflows + +This guide demonstrates how to effectively use the ConcurrentWorkflow architecture with its advanced dashboard feature for parallel processing of complex tasks using multiple specialized agents. \ No newline at end of file diff --git a/docs/swarms/structs/various_execution_methods.md b/docs/swarms/structs/various_execution_methods.md index f9d2981d..04592a15 100644 --- a/docs/swarms/structs/various_execution_methods.md +++ b/docs/swarms/structs/various_execution_methods.md @@ -160,7 +160,6 @@ Runs agents with system resource monitoring and adaptive batch sizing. ## Performance Considerations -- All functions are decorated with `@profile_func` for performance monitoring - Default batch sizes and worker counts are optimized based on CPU cores - Resource monitoring helps prevent system overload - Using `uvloop` provides better performance than standard `asyncio` diff --git a/example.py b/example.py index 88e6c7c7..c7114332 100644 --- a/example.py +++ b/example.py @@ -33,14 +33,13 @@ agent = Agent( - Performance attribution You communicate in precise, technical terms while maintaining clarity for stakeholders.""", - model_name="claude-3-sonnet-20240229", + model_name="gemini-2.5-flash", dynamic_temperature_enabled=True, output_type="str-all-except-first", - streaming_on=True, + streaming_on=False, max_loops="auto", - print_on=True, - telemetry_enable=False, - # event_listeners=[], + interactive=True, + no_reasoning_prompt=True, # dashboard=True ) diff --git a/examples/multi_agent/graph/graph_workflow_basic.py b/examples/multi_agent/graphworkflow_examples/graph/graph_workflow_basic.py similarity index 100% rename from examples/multi_agent/graph/graph_workflow_basic.py rename to examples/multi_agent/graphworkflow_examples/graph/graph_workflow_basic.py diff --git a/graph_workflow_example.py b/examples/multi_agent/graphworkflow_examples/graph/graph_workflow_example.py similarity index 100% rename from graph_workflow_example.py rename to examples/multi_agent/graphworkflow_examples/graph/graph_workflow_example.py diff --git a/heavy_swarm_example.py b/heavy_swarm_example.py new file mode 100644 index 00000000..c697a9f2 --- /dev/null +++ b/heavy_swarm_example.py @@ -0,0 +1,14 @@ +from swarms.structs.heavy_swarm import HeavySwarm + +swarm = HeavySwarm( + worker_model_name="claude-3-5-sonnet-20240620", + show_dashboard=True, + question_agent_model_name="gpt-4.1", + loops_per_agent=1, +) + +out = swarm.run( + "List the top 5 gold and commodity ETFs with the best performance and lowest expense ratios. For each ETF, provide the ticker symbol, full name, current price, 1-year and 5-year returns (in %), and the expense ratio. Also, specify which major brokerages (e.g., Fidelity, Schwab, Vanguard, E*TRADE) offer these ETFs for purchase. Present your findings in a clear, structured table." +) + +print(out) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index ce57c65f..43ed4a0d 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -305,7 +305,7 @@ class Agent: id: Optional[str] = agent_id(), llm: Optional[Any] = None, template: Optional[str] = None, - max_loops: Optional[int] = 1, + max_loops: Optional[Union[int, str]] = 1, stopping_condition: Optional[Callable[[str], bool]] = None, loop_interval: Optional[int] = 0, retry_attempts: Optional[int] = 3, @@ -433,6 +433,7 @@ class Agent: summarize_multiple_images: bool = False, tool_retry_attempts: int = 3, speed_mode: str = None, + reasoning_prompt_on: bool = True, *args, **kwargs, ): @@ -574,6 +575,7 @@ class Agent: self.summarize_multiple_images = summarize_multiple_images self.tool_retry_attempts = tool_retry_attempts self.speed_mode = speed_mode + self.reasoning_prompt_on = reasoning_prompt_on # Initialize the feedback self.feedback = [] @@ -592,7 +594,13 @@ class Agent: if exists(self.sop) or exists(self.sop_list): self.handle_sop_ops() - if self.max_loops >= 2: + if self.interactive is True: + self.reasoning_prompt_on = False + + if self.reasoning_prompt_on is True and ( + (isinstance(self.max_loops, int) and self.max_loops >= 2) + or self.max_loops == "auto" + ): self.system_prompt += generate_reasoning_prompt( self.max_loops ) @@ -1045,18 +1053,27 @@ class Agent: ): loop_count += 1 - if self.max_loops >= 2: - self.short_memory.add( - role=self.agent_name, - content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}", - ) + if ( + isinstance(self.max_loops, int) + and self.max_loops >= 2 + ): + if self.reasoning_prompt_on is True: + self.short_memory.add( + role=self.agent_name, + content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}", + ) # If it is the final loop, then add the final loop message - if loop_count >= 2 and loop_count == self.max_loops: - self.short_memory.add( - role=self.agent_name, - content=f"🎉 Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.", - ) + if ( + loop_count >= 2 + and isinstance(self.max_loops, int) + and loop_count == self.max_loops + ): + if self.reasoning_prompt_on is True: + self.short_memory.add( + role=self.agent_name, + content=f"🎉 Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.", + ) # Dynamic temperature if self.dynamic_temperature_enabled is True: @@ -1191,6 +1208,7 @@ class Agent: break if self.interactive: + # logger.info("Interactive mode enabled.") user_input = input("You: ") @@ -1943,7 +1961,7 @@ class Agent: """Upddate the system message""" self.system_prompt = system_prompt - def update_max_loops(self, max_loops: int): + def update_max_loops(self, max_loops: Union[int, str]): """Update the max loops""" self.max_loops = max_loops diff --git a/swarms/structs/cron_job.py b/swarms/structs/cron_job.py index a58b2e2f..a56a813e 100644 --- a/swarms/structs/cron_job.py +++ b/swarms/structs/cron_job.py @@ -6,7 +6,7 @@ from typing import Any, Callable, List, Optional, Union import schedule from loguru import logger -from swarms import Agent +# from swarms import Agent class CronJobError(Exception): @@ -49,7 +49,7 @@ class CronJob: def __init__( self, - agent: Optional[Union[Agent, Callable]] = None, + agent: Optional[Union[Any, Callable]] = None, interval: Optional[str] = None, job_id: Optional[str] = None, ): @@ -238,7 +238,7 @@ class CronJob: """ try: logger.debug(f"Executing task for job {self.job_id}") - if isinstance(self.agent, Agent): + if isinstance(self.agent, Callable): return self.agent.run(task=task, **kwargs) else: return self.agent(task, **kwargs) diff --git a/swarms/utils/__init__.py b/swarms/utils/__init__.py index f331c6b9..2eb63089 100644 --- a/swarms/utils/__init__.py +++ b/swarms/utils/__init__.py @@ -14,7 +14,6 @@ from swarms.utils.file_processing import ( from swarms.utils.parse_code import extract_code_from_markdown from swarms.utils.pdf_to_text import pdf_to_text from swarms.utils.try_except_wrapper import try_except_wrapper -from swarms.utils.calculate_func_metrics import profile_func from swarms.utils.litellm_tokenizer import count_tokens from swarms.utils.output_types import HistoryOutputType from swarms.utils.history_output_formatter import ( @@ -38,7 +37,6 @@ __all__ = [ "extract_code_from_markdown", "pdf_to_text", "try_except_wrapper", - "profile_func", "count_tokens", "HistoryOutputType", "history_output_formatter", diff --git a/swarms/utils/calculate_func_metrics.py b/swarms/utils/calculate_func_metrics.py deleted file mode 100644 index 795e7bb2..00000000 --- a/swarms/utils/calculate_func_metrics.py +++ /dev/null @@ -1,171 +0,0 @@ -import time -import tracemalloc -from functools import wraps -from typing import Any, Callable - -import psutil -from pydantic import BaseModel - -from swarms.utils.loguru_logger import initialize_logger - -logger = initialize_logger(log_folder="calculate_func_metrics") - - -class FunctionMetrics(BaseModel): - execution_time: float - memory_usage: float - cpu_usage: float - io_operations: int - function_calls: int - - -def profile_func(func): - """ - Decorator function that profiles the execution of a given function. - - Args: - func: The function to be profiled. - - Returns: - A wrapper function that profiles the execution of the given function and returns the result along with the metrics. - - """ - - def wrapper(*args, **kwargs): - # Record the initial time, memory usage, CPU usage, and I/O operations - start_time = time.time() - start_mem = psutil.Process().memory_info().rss - start_cpu = psutil.cpu_percent() - start_io = ( - psutil.disk_io_counters().read_count - + psutil.disk_io_counters().write_count - ) - - # Call the function - result = func(*args, **kwargs) - - # Record the final time, memory usage, CPU usage, and I/O operations - end_time = time.time() - end_mem = psutil.Process().memory_info().rss - end_cpu = psutil.cpu_percent() - end_io = ( - psutil.disk_io_counters().read_count - + psutil.disk_io_counters().write_count - ) - - # Calculate the execution time, memory usage, CPU usage, and I/O operations - execution_time = end_time - start_time - memory_usage = (end_mem - start_mem) / ( - 1024**2 - ) # Convert bytes to MiB - cpu_usage = end_cpu - start_cpu - io_operations = end_io - start_io - - # Return the metrics as a FunctionMetrics object - metrics = FunctionMetrics( - execution_time=execution_time, - memory_usage=memory_usage, - cpu_usage=cpu_usage, - io_operations=io_operations, - function_calls=1, # Each call to the function counts as one function call - ) - - json_data = metrics.model_dump_json(indent=4) - - logger.info(f"Function metrics: {json_data}") - - return result, metrics - - return wrapper - - -def profile_all(func: Callable) -> Callable: - """ - A decorator to profile memory usage, CPU usage, and I/O operations - of a function and log the data using loguru. - - It combines tracemalloc for memory profiling, psutil for CPU and I/O operations, - and measures execution time. - - Args: - func (Callable): The function to be profiled. - - Returns: - Callable: The wrapped function with profiling enabled. - """ - - @wraps(func) - def wrapper(*args: Any, **kwargs: Any) -> Any: - # Start memory tracking - tracemalloc.start() - - # Get initial CPU stats - process = psutil.Process() - initial_cpu_times = process.cpu_times() - - # Get initial I/O stats if available - try: - initial_io_counters = process.io_counters() - io_tracking_available = True - except AttributeError: - logger.warning( - "I/O counters not available on this platform." - ) - io_tracking_available = False - - # Start timing the function execution - start_time = time.time() - - # Execute the function - result = func(*args, **kwargs) - - # Stop timing - end_time = time.time() - execution_time = end_time - start_time - - # Get final CPU stats - final_cpu_times = process.cpu_times() - - # Get final I/O stats if available - if io_tracking_available: - final_io_counters = process.io_counters() - io_read_count = ( - final_io_counters.read_count - - initial_io_counters.read_count - ) - io_write_count = ( - final_io_counters.write_count - - initial_io_counters.write_count - ) - else: - io_read_count = io_write_count = 0 - - # Get memory usage statistics - snapshot = tracemalloc.take_snapshot() - top_stats = snapshot.statistics("lineno") - - # Calculate CPU usage - cpu_usage = ( - final_cpu_times.user - - initial_cpu_times.user - + final_cpu_times.system - - initial_cpu_times.system - ) - - # Log the data - logger.info(f"Execution time: {execution_time:.4f} seconds") - logger.info(f"CPU usage: {cpu_usage:.2f} seconds") - if io_tracking_available: - logger.info( - f"I/O Operations - Read: {io_read_count}, Write: {io_write_count}" - ) - logger.info("Top memory usage:") - for stat in top_stats[:10]: - logger.info(stat) - - # Stop memory tracking - tracemalloc.stop() - - return result - - return wrapper diff --git a/swarms/utils/retry_func.py b/swarms/utils/retry_func.py deleted file mode 100644 index 2a32903d..00000000 --- a/swarms/utils/retry_func.py +++ /dev/null @@ -1,66 +0,0 @@ -import time -from typing import Any, Callable, Type, Union, Tuple -from loguru import logger - - -def retry_function( - func: Callable, - *args: Any, - max_retries: int = 3, - delay: float = 1.0, - backoff_factor: float = 2.0, - exceptions: Union[ - Type[Exception], Tuple[Type[Exception], ...] - ] = Exception, - **kwargs: Any, -) -> Any: - """ - A function that retries another function if it raises specified exceptions. - - Args: - func (Callable): The function to retry - *args: Positional arguments to pass to the function - max_retries (int): Maximum number of retries before giving up. Defaults to 3. - delay (float): Initial delay between retries in seconds. Defaults to 1.0. - backoff_factor (float): Multiplier applied to delay between retries. Defaults to 2.0. - exceptions (Exception or tuple): Exception(s) that trigger a retry. Defaults to Exception. - **kwargs: Keyword arguments to pass to the function - - Returns: - Any: The return value of the function if successful - - Example: - def fetch_data(url: str) -> dict: - return requests.get(url).json() - - # Retry the fetch_data function - result = retry_function( - fetch_data, - "https://api.example.com", - max_retries=3, - exceptions=(ConnectionError, TimeoutError) - ) - """ - retries = 0 - current_delay = delay - - while True: - try: - return func(*args, **kwargs) - except exceptions as e: - retries += 1 - if retries > max_retries: - logger.error( - f"Function {func.__name__} failed after {max_retries} retries. " - f"Final error: {str(e)}" - ) - raise - - logger.warning( - f"Retry {retries}/{max_retries} for function {func.__name__} " - f"after error: {str(e)}. " - f"Waiting {current_delay} seconds..." - ) - - time.sleep(current_delay) - current_delay *= backoff_factor diff --git a/swarms/utils/workspace_manager.py b/swarms/utils/workspace_manager.py new file mode 100644 index 00000000..1b4c7846 --- /dev/null +++ b/swarms/utils/workspace_manager.py @@ -0,0 +1,428 @@ +""" +Simple workspace management functions for creating files and folders. + +Raw utility functions for easy file and folder creation operations. +""" + +import json +import yaml +from pathlib import Path +from typing import Optional, Dict, Any + + +def create_folder( + folder_name: str, parent_path: Optional[str] = None +) -> Path: + """ + Create a new folder. + + Args: + folder_name: Name of the folder to create + parent_path: Parent directory path. If None, creates in current directory. + + Returns: + Path object of the created folder + """ + if parent_path: + folder_path = Path(parent_path) / folder_name + else: + folder_path = Path(folder_name) + + folder_path.mkdir(parents=True, exist_ok=True) + return folder_path + + +def file_exists( + file_name: str, parent_path: Optional[str] = None +) -> bool: + """ + Check if a file exists. + + Args: + file_name: Name of the file to check + parent_path: Parent directory path. If None, checks in current directory. + + Returns: + True if file exists, False otherwise + """ + if parent_path: + file_path = Path(parent_path) / file_name + else: + file_path = Path(file_name) + + return file_path.exists() and file_path.is_file() + + +def update_file( + file_name: str, content: str, parent_path: Optional[str] = None +) -> Path: + """ + Update an existing file with new content. + + Args: + file_name: Name of the file to update + content: New content to write to the file + parent_path: Parent directory path. If None, updates in current directory. + + Returns: + Path object of the updated file + + Raises: + FileNotFoundError: If file doesn't exist + """ + if parent_path: + file_path = Path(parent_path) / file_name + else: + file_path = Path(file_name) + + if not file_path.exists(): + raise FileNotFoundError(f"File {file_path} does not exist") + + file_path.write_text(content, encoding="utf-8") + return file_path + + +def create_or_update_file( + file_name: str, + content: str = "", + parent_path: Optional[str] = None, +) -> Path: + """ + Create a new file or update existing file with content. + + Args: + file_name: Name of the file to create or update + content: Content to write to the file + parent_path: Parent directory path. If None, creates/updates in current directory. + + Returns: + Path object of the created or updated file + """ + if parent_path: + file_path = Path(parent_path) / file_name + else: + file_path = Path(file_name) + + file_path.parent.mkdir(parents=True, exist_ok=True) + file_path.write_text(content, encoding="utf-8") + return file_path + + +def create_file( + file_name: str, + content: str = "", + parent_path: Optional[str] = None, +) -> Path: + """ + Create a new file with content. + + Args: + file_name: Name of the file to create + content: Content to write to the file + parent_path: Parent directory path. If None, creates in current directory. + + Returns: + Path object of the created file + """ + if parent_path: + file_path = Path(parent_path) / file_name + else: + file_path = Path(file_name) + + if file_path.exists(): + raise FileExistsError( + f"File {file_path} already exists. Use create_or_update_file() to update existing files." + ) + + file_path.parent.mkdir(parents=True, exist_ok=True) + file_path.write_text(content, encoding="utf-8") + return file_path + + +def create_python_file( + file_name: str, + content: str = "", + parent_path: Optional[str] = None, +) -> Path: + """ + Create a Python file with content. + + Args: + file_name: Name of the file (with or without .py extension) + content: Python code content + parent_path: Parent directory path + + Returns: + Path object of the created Python file + """ + if not file_name.endswith(".py"): + file_name += ".py" + return create_file(file_name, content, parent_path) + + +def create_or_update_python_file( + file_name: str, + content: str = "", + parent_path: Optional[str] = None, +) -> Path: + """ + Create a Python file or update existing Python file with content. + + Args: + file_name: Name of the file (with or without .py extension) + content: Python code content + parent_path: Parent directory path + + Returns: + Path object of the created or updated Python file + """ + if not file_name.endswith(".py"): + file_name += ".py" + return create_or_update_file(file_name, content, parent_path) + + +def create_json_file( + file_name: str, + data: Dict[str, Any], + parent_path: Optional[str] = None, +) -> Path: + """ + Create a JSON file with data. + + Args: + file_name: Name of the file (with or without .json extension) + data: Dictionary data to serialize to JSON + parent_path: Parent directory path + + Returns: + Path object of the created JSON file + """ + if not file_name.endswith(".json"): + file_name += ".json" + content = json.dumps(data, indent=2, ensure_ascii=False) + return create_file(file_name, content, parent_path) + + +def create_or_update_json_file( + file_name: str, + data: Dict[str, Any], + parent_path: Optional[str] = None, +) -> Path: + """ + Create a JSON file or update existing JSON file with data. + + Args: + file_name: Name of the file (with or without .json extension) + data: Dictionary data to serialize to JSON + parent_path: Parent directory path + + Returns: + Path object of the created or updated JSON file + """ + if not file_name.endswith(".json"): + file_name += ".json" + content = json.dumps(data, indent=2, ensure_ascii=False) + return create_or_update_file(file_name, content, parent_path) + + +def create_yaml_file( + file_name: str, + data: Dict[str, Any], + parent_path: Optional[str] = None, +) -> Path: + """ + Create a YAML file with data. + + Args: + file_name: Name of the file (with or without .yaml/.yml extension) + data: Dictionary data to serialize to YAML + parent_path: Parent directory path + + Returns: + Path object of the created YAML file + """ + if not ( + file_name.endswith(".yaml") or file_name.endswith(".yml") + ): + file_name += ".yaml" + content = yaml.dump( + data, default_flow_style=False, allow_unicode=True + ) + return create_file(file_name, content, parent_path) + + +def create_or_update_yaml_file( + file_name: str, + data: Dict[str, Any], + parent_path: Optional[str] = None, +) -> Path: + """ + Create a YAML file or update existing YAML file with data. + + Args: + file_name: Name of the file (with or without .yaml/.yml extension) + data: Dictionary data to serialize to YAML + parent_path: Parent directory path + + Returns: + Path object of the created or updated YAML file + """ + if not ( + file_name.endswith(".yaml") or file_name.endswith(".yml") + ): + file_name += ".yaml" + content = yaml.dump( + data, default_flow_style=False, allow_unicode=True + ) + return create_or_update_file(file_name, content, parent_path) + + +def create_markdown_file( + file_name: str, + content: str = "", + parent_path: Optional[str] = None, +) -> Path: + """ + Create a Markdown file with content. + + Args: + file_name: Name of the file (with or without .md extension) + content: Markdown content + parent_path: Parent directory path + + Returns: + Path object of the created Markdown file + """ + if not file_name.endswith(".md"): + file_name += ".md" + return create_file(file_name, content, parent_path) + + +def create_or_update_markdown_file( + file_name: str, + content: str = "", + parent_path: Optional[str] = None, +) -> Path: + """ + Create a Markdown file or update existing Markdown file with content. + + Args: + file_name: Name of the file (with or without .md extension) + content: Markdown content + parent_path: Parent directory path + + Returns: + Path object of the created or updated Markdown file + """ + if not file_name.endswith(".md"): + file_name += ".md" + return create_or_update_file(file_name, content, parent_path) + + +def create_text_file( + file_name: str, + content: str = "", + parent_path: Optional[str] = None, +) -> Path: + """ + Create a text file with content. + + Args: + file_name: Name of the file (with or without .txt extension) + content: Text content + parent_path: Parent directory path + + Returns: + Path object of the created text file + """ + if not file_name.endswith(".txt"): + file_name += ".txt" + return create_file(file_name, content, parent_path) + + +def create_or_update_text_file( + file_name: str, + content: str = "", + parent_path: Optional[str] = None, +) -> Path: + """ + Create a text file or update existing text file with content. + + Args: + file_name: Name of the file (with or without .txt extension) + content: Text content + parent_path: Parent directory path + + Returns: + Path object of the created or updated text file + """ + if not file_name.endswith(".txt"): + file_name += ".txt" + return create_or_update_file(file_name, content, parent_path) + + +def create_empty_file( + file_name: str, parent_path: Optional[str] = None +) -> Path: + """ + Create an empty file. + + Args: + file_name: Name of the file + parent_path: Parent directory path + + Returns: + Path object of the created empty file + """ + return create_file(file_name, "", parent_path) + + +def create_project_structure( + structure: Dict[str, Any], parent_path: Optional[str] = None +) -> Dict[str, Path]: + """ + Create a nested project structure from a dictionary. + + Args: + structure: Dictionary defining the project structure + parent_path: Parent directory path + + Returns: + Dictionary mapping structure keys to created Path objects + + Example: + structure = { + "src": { + "main.py": "print('Hello World')", + "utils": { + "__init__.py": "", + "helper.py": "def helper(): pass" + } + }, + "tests": { + "test_main.py": "import unittest" + }, + "README.md": "# My Project" + } + """ + created_paths = {} + base_path = Path(parent_path) if parent_path else Path.cwd() + + def _create_structure(structure_dict, current_path): + for key, value in structure_dict.items(): + item_path = current_path / key + + if isinstance(value, dict): + # It's a folder + item_path.mkdir(parents=True, exist_ok=True) + created_paths[key] = item_path + _create_structure(value, item_path) + else: + # It's a file + content = str(value) if value is not None else "" + item_path.parent.mkdir(parents=True, exist_ok=True) + item_path.write_text(content, encoding="utf-8") + created_paths[key] = item_path + + _create_structure(structure, base_path) + return created_paths diff --git a/test_conversation.py b/tests/test_conversation.py similarity index 100% rename from test_conversation.py rename to tests/test_conversation.py