From baa42150837de46fca6a6f57e625b05d3c8c3cbf Mon Sep 17 00:00:00 2001 From: harshalmore31 <86048671+harshalmore31@users.noreply.github.com> Date: Wed, 25 Dec 2024 00:33:18 +0530 Subject: [PATCH] ui-v2 --- swarms/structs/ui/ui.py | 235 +++++++++++++++------------------------- 1 file changed, 88 insertions(+), 147 deletions(-) diff --git a/swarms/structs/ui/ui.py b/swarms/structs/ui/ui.py index f5ff1aa0..b0c2c543 100644 --- a/swarms/structs/ui/ui.py +++ b/swarms/structs/ui/ui.py @@ -214,27 +214,6 @@ def initialize_agents( return agents -def get_safe_filename(base_name: str) -> str: - """ - Create a safe filename by removing or replacing invalid characters. - - Args: - base_name: The original filename - - Returns: - A sanitized filename safe for all operating systems - """ - # Replace invalid characters with underscores - invalid_chars = '<>:"/\\|?*' - filename = ''.join('_' if c in invalid_chars else c for c in base_name) - - # Ensure the filename isn't too long (max 255 characters) - if len(filename) > 255: - name_part, ext_part = os.path.splitext(filename) - filename = name_part[:255-len(ext_part)] + ext_part - - return filename - async def execute_task(task: str, max_loops: int, data_temp: float, sum_temp: float, swarm_type: str, agent_keys: List[str], flow: str = None) -> Tuple[Dict[str, str], 'SwarmRouter', str]: """ @@ -269,75 +248,86 @@ async def execute_task(task: str, max_loops: int, data_temp: float, sum_temp: fl output_dir = "swarm_outputs" os.makedirs(output_dir, exist_ok=True) - # Create a simple filename with just a timestamp for uniqueness + # Create a sanitized filename using only safe characters timestamp = time.strftime("%Y%m%d_%H%M%S") - output_file = f"output_{timestamp}.csv" + output_file = f"swarm_output_{timestamp}.csv" output_path = os.path.join(output_dir, output_file) - # Initialize SpreadSheetSwarm with the model + # Validate the output path + try: + # Ensure the path is valid and writable + with open(output_path, 'w') as f: + pass + os.remove(output_path) # Clean up the test file + except OSError as e: + error_msg = f"Invalid output path: {str(e)}" + log_event("error", error_msg) + return {}, None, error_msg + + # Create and initialize SpreadSheetSwarm try: swarm = SpreadSheetSwarm( agents=agents, max_loops=max_loops, name="spreadsheet-swarm", description="SpreadSheet processing workflow", - save_file_path=output_path, # Use our custom output path + save_file_path=output_path, workspace_dir=output_dir, llm=model, - autosave=True, - # Remove append_timestamp and append_run_id as they might not be supported + autosave=True ) - - # Set the filename directly on the swarm object if possible - if hasattr(swarm, 'filename'): - swarm.filename = output_file - - # Execute the swarm with task + except Exception as e: + error_msg = f"Failed to initialize SpreadSheetSwarm: {str(e)}" + log_event("error", error_msg) + return {}, None, error_msg + + # Execute the swarm with proper error handling + try: result = await asyncio.wait_for( asyncio.to_thread(lambda: swarm.run(task=task)), - timeout=900 + timeout=900 # 15 minutes timeout ) - # Verify the file exists and handle potential filename changes - actual_output_path = output_path + # Verify the output file was created if not os.path.exists(output_path): - # Look for files matching our base pattern - possible_files = [f for f in os.listdir(output_dir) if f.startswith("output_")] - if possible_files: - actual_output_path = os.path.join(output_dir, possible_files[-1]) - - # Process SpreadSheetSwarm result - try: - if isinstance(result, dict): - processed_result = { - "CSV File Path": actual_output_path, - "Status": "Success", - "Message": "Spreadsheet processing completed successfully", - "Analysis": result.get("analysis", "No analysis provided"), - "Summary": result.get("summary", "No summary provided") - } - else: - processed_result = { - "CSV File Path": actual_output_path, - "Status": "Success", - "Message": "Spreadsheet processing completed successfully", - "Result": str(result) - } - return processed_result, swarm, "" - except Exception as e: - error_msg = f"Failed to process SpreadSheetSwarm result: {str(e)}" + error_msg = "Output file was not created" log_event("error", error_msg) return {}, None, error_msg + return { + "CSV File Path": output_path, + "Status": "Success", + "Message": "Spreadsheet processing completed successfully", + "Result": str(result) + }, swarm, "" + + except asyncio.TimeoutError: + error_msg = "SpreadSheetSwarm execution timed out after 900 seconds" + log_event("error", error_msg) + return {}, None, error_msg except Exception as e: error_msg = f"SpreadSheetSwarm execution error: {str(e)}" log_event("error", error_msg) return {}, None, error_msg + + # Rest of the swarm type handling... + elif swarm_type == "AgentRearrange": + if not flow: + return {}, None, "Flow configuration is required for AgentRearrange" + router_kwargs["flow"] = flow + + elif swarm_type == "MixtureOfAgents": + if len(agents) < 2: + return {}, None, "MixtureOfAgents requires at least 2 agents" + router_kwargs.update({ + "aggregator_agent": agents[-1], + "layers": max_loops + }) # Create router and execute task for non-SpreadSheetSwarm types if swarm_type != "SpreadSheetSwarm": try: - timeout = 450 + timeout = 450 # Default timeout await asyncio.sleep(0.5) router = SwarmRouter(**router_kwargs) @@ -348,35 +338,25 @@ async def execute_task(task: str, max_loops: int, data_temp: float, sum_temp: fl timeout=timeout ) - # Process results based on swarm type - if swarm_type == "ConcurrentWorkflow": - responses = _extract_concurrent_responses(str(result), agents) - elif swarm_type == "SequentialWorkflow": - if isinstance(result, dict): - responses = {f"Step {i+1}": str(v) for i, v in enumerate(result.values())} - else: - responses = {"Final Output": str(result)} - elif swarm_type == "AgentRearrange": - if isinstance(result, dict): - responses = {f"Step {i+1}": str(v) for i, v in enumerate(result.values())} - else: - flow_steps = flow.split("->") - responses = {f"Step {i+1} ({step.strip()})": str(part) - for i, (step, part) in enumerate(zip(flow_steps, str(result).split("[NEXT]")))} - elif swarm_type == "MixtureOfAgents": - if isinstance(result, dict): - responses = { - **{f"Agent {i+1}": str(v) for i, v in enumerate(result.get("individual_outputs", []))}, - "Aggregated Summary": str(result.get("aggregated_output", "No aggregated output")) - } - else: - responses = {"Final Output": str(result)} - else: # Auto or unknown type + # Process results + def process_result(result): + """Process and standardize the result output.""" + if isinstance(result, str): + try: + # Attempt to parse as JSON + parsed_result = json.loads(result) + return parsed_result + except json.JSONDecodeError: + # Fallback to string result + return {"Final Output": result} + if isinstance(result, dict): - responses = {str(k): str(v) for k, v in result.items()} - else: - responses = {"Final Output": str(result)} - + # Clean and standardize dictionary results + return {str(k): str(v) for k, v in result.items()} + + return {"Final Output": str(result)} + + responses = process_result(result) return responses, router, "" except asyncio.TimeoutError: @@ -738,67 +718,28 @@ def create_app(): # Format output based on swarm type output_lines = [] - - if swarm_type == "SpreadSheetSwarm": - output_lines.append("=== Spreadsheet Swarm Results ===\n") - output_lines.append(f"CSV File: {responses.get('CSV File Path', 'No file generated')}") - output_lines.append(f"Status: {responses.get('Status', 'Unknown')}") - output_lines.append(f"Message: {responses.get('Message', '')}") - - if 'Analysis' in responses: - output_lines.append("\n=== Analysis ===") - output_lines.append(responses['Analysis']) - - if 'Summary' in responses: - output_lines.append("\n=== Summary ===") - output_lines.append(responses['Summary']) - - if 'Result' in responses: - output_lines.append("\n=== Additional Results ===") - output_lines.append(responses['Result']) - - elif swarm_type == "ConcurrentWorkflow": - output_lines.append("=== Concurrent Workflow Results ===\n") - for agent_name, response in responses.items(): - output_lines.append(f"\n--- {agent_name} ---") - output_lines.append(response.strip()) - output_lines.append("-" * 50) - - elif swarm_type == "SequentialWorkflow": - output_lines.append("=== Sequential Workflow Results ===\n") - for step, response in responses.items(): - output_lines.append(f"\n--- {step} ---") - output_lines.append(response.strip()) - output_lines.append("-" * 50) - - elif swarm_type == "AgentRearrange": - output_lines.append("=== Agent Rearrange Results ===\n") - for step, response in responses.items(): - output_lines.append(f"\n--- {step} ---") - output_lines.append(response.strip()) - output_lines.append("-" * 50) - - elif swarm_type == "MixtureOfAgents": - output_lines.append("=== Mixture of Agents Results ===\n") - # First show individual agent outputs + if router.swarm_type == "AgentRearrange": + for key, value in responses.items(): + output_lines.append(f"### Step {key} ###\n{value}\n{'='*50}\n") + elif router.swarm_type == "MixtureOfAgents": + # Add individual agent outputs for key, value in responses.items(): if key != "Aggregated Summary": - output_lines.append(f"\n--- {key} ---") - output_lines.append(value.strip()) - output_lines.append("-" * 50) - - # Then show the aggregated summary at the end + output_lines.append(f"### {key} ###\n{value}\n") + # Add aggregated summary at the end if "Aggregated Summary" in responses: - output_lines.append("\n=== Aggregated Summary ===") - output_lines.append(responses["Aggregated Summary"]) - output_lines.append("=" * 50) - - else: # Auto or unknown type - output_lines.append("=== Results ===\n") + output_lines.append(f"\n### Aggregated Summary ###\n{responses['Aggregated Summary']}\n{'='*50}\n") + elif router.swarm_type == "SpreadSheetSwarm": + output_lines.append(f"### Spreadsheet Output ###\n{responses.get('CSV File Path', 'No file path provided')}\n{'='*50}\n") + elif router.swarm_type == "ConcurrentWorkflow": for key, value in responses.items(): - output_lines.append(f"\n--- {key} ---") - output_lines.append(value.strip()) - output_lines.append("-" * 50) + output_lines.append(f"### {key} ###\n{value}\n{'='*50}\n") + else: # SequentialWorkflow, auto + if isinstance(responses, dict): + for key, value in responses.items(): + output_lines.append(f"### {key} ###\n{value}\n{'='*50}\n") + else: + output_lines.append(str(responses)) yield "\n".join(output_lines), "Completed"