diff --git a/swarms/structs/ui/ui.py b/swarms/structs/ui/ui.py index b0c2c543..d30f8136 100644 --- a/swarms/structs/ui/ui.py +++ b/swarms/structs/ui/ui.py @@ -5,22 +5,10 @@ import json import time import asyncio import gradio as gr -import re - -# Import necessary classes and functions from swarms library from swarms.structs.agent import Agent -from swarms.structs.concurrent_workflow import ConcurrentWorkflow -from swarms.structs.mixture_of_agents import MixtureOfAgents -from swarms.structs.rearrange import AgentRearrange -from swarms.structs.sequential_workflow import SequentialWorkflow -from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm -from swarms.structs.swarm_matcher import swarm_matcher, SwarmMatcher, SwarmMatcherConfig, initialize_swarm_types from swarms.structs.swarm_router import SwarmRouter from swarms.utils.loguru_logger import initialize_logger -from groq_model import OpenAIChat # Import OpenAIChat from the correct location -from swarms.utils.file_processing import create_file_in_folder -from doc_master import doc_master - +from swarm_models import OpenAIChat # Initialize logger @@ -33,6 +21,9 @@ load_dotenv() # Get the OpenAI API key from the environment variable api_key = os.getenv("GROQ_API_KEY") +# changed to swarms_models +# adding functionality to view other models of swarms models + # Model initialization model = OpenAIChat( openai_api_base="https://api.groq.com/openai/v1", @@ -42,21 +33,18 @@ model = OpenAIChat( ) # Define the path to agent_prompts.json + +# locates the json file and then fetches the promopts + PROMPT_JSON_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "agent_prompts.json") logger.info(f"Loading prompts from: {PROMPT_JSON_PATH}") # Global log storage + execution_logs = [] -def log_event(level: str, message: str, metadata: Optional[Dict] = None): - """ - Log an event and store it in the execution logs. - Args: - level: Log level (info, warning, error, etc.) - message: Log message - metadata: Optional metadata dictionary - """ +def log_event(level: str, message: str, metadata: Optional[Dict] = None): timestamp = time.strftime("%Y-%m-%d %H:%M:%S") log_entry = { "timestamp": timestamp, @@ -65,25 +53,12 @@ def log_event(level: str, message: str, metadata: Optional[Dict] = None): "metadata": metadata or {} } execution_logs.append(log_entry) - - # Also log to the logger log_func = getattr(logger, level.lower(), logger.info) log_func(message) def get_logs(router: Optional['SwarmRouter'] = None) -> List[str]: - """ - Get formatted logs from both the execution logs and router logs if available. - - Args: - router: Optional SwarmRouter instance to get additional logs from - - Returns: - List of formatted log strings - """ formatted_logs = [] - - # Add execution logs for log in execution_logs: metadata_str = "" if log["metadata"]: @@ -107,12 +82,10 @@ def get_logs(router: Optional['SwarmRouter'] = None) -> List[str]: def clear_logs(): - """Clear the execution logs.""" execution_logs.clear() def load_prompts_from_json() -> Dict[str, str]: - """Robust prompt loading with comprehensive error handling.""" try: if not os.path.exists(PROMPT_JSON_PATH): error_msg = f"Prompts file not found at: {PROMPT_JSON_PATH}" @@ -131,20 +104,20 @@ def load_prompts_from_json() -> Dict[str, str]: error_msg = f"Invalid JSON in prompts file: {str(e)}" log_event("error", error_msg) raise - + if not isinstance(data, dict): error_msg = "Prompts file must contain a JSON object" log_event("error", error_msg) raise ValueError(error_msg) - + prompts = {} for agent_name, details in data.items(): if not isinstance(details, dict) or "system_prompt" not in details: log_event("warning", f"Skipping invalid agent config: {agent_name}") continue - + prompts[f"agent.{agent_name}"] = details["system_prompt"] - + if not prompts: error_msg = "No valid prompts found in prompts file" log_event("error", error_msg) @@ -154,10 +127,10 @@ def load_prompts_from_json() -> Dict[str, str]: "agent.summarizer": "You are a summarization agent...", "agent.onboarding_agent": "You are an onboarding agent..." } - + log_event("info", f"Successfully loaded {len(prompts)} prompts from JSON") return prompts - + except Exception as e: error_msg = f"Error loading prompts: {str(e)}" log_event("error", error_msg) @@ -169,15 +142,10 @@ def load_prompts_from_json() -> Dict[str, str]: } -# Load prompts AGENT_PROMPTS = load_prompts_from_json() -def initialize_agents( - data_temp: float, - sum_temp: float, - agent_keys: List[str] -) -> List[Agent]: - """Enhanced agent initialization with more robust configuration.""" + +def initialize_agents(dynamic_temp: float, agent_keys: List[str]) -> List[Agent]: agents = [] seen_names = set() for agent_key in agent_keys: @@ -186,7 +154,7 @@ def initialize_agents( agent_prompt = AGENT_PROMPTS[agent_key] agent_name = agent_key.split('.')[-1] - + # Ensure unique agent names base_name = agent_name counter = 1 @@ -194,7 +162,7 @@ def initialize_agents( agent_name = f"{base_name}_{counter}" counter += 1 seen_names.add(agent_name) - + agent = Agent( agent_name=f"Agent-{agent_name}", system_prompt=agent_prompt, @@ -207,17 +175,18 @@ def initialize_agents( user_name="pe_firm", retry_attempts=1, context_length=200000, - output_type="string", - temperature=data_temp, + output_type="string", # here is the output type which is string + temperature=dynamic_temp, ) agents.append(agent) return agents -async def execute_task(task: str, max_loops: int, data_temp: float, sum_temp: float, - swarm_type: str, agent_keys: List[str], flow: str = None) -> Tuple[Dict[str, str], 'SwarmRouter', str]: + +async def execute_task(task: str, max_loops: int, dynamic_temp: float, + swarm_type: str, agent_keys: List[str], flow: str = None) -> Tuple[Any, 'SwarmRouter', str]: """ - Enhanced task execution with comprehensive error handling and result processing. + Enhanced task execution with comprehensive error handling and raw result return. """ start_time = time.time() log_event("info", f"Starting task execution: {task}") @@ -225,14 +194,14 @@ async def execute_task(task: str, max_loops: int, data_temp: float, sum_temp: fl try: # Initialize agents try: - agents = initialize_agents(data_temp, sum_temp, agent_keys) + agents = initialize_agents(dynamic_temp, agent_keys) log_event("info", f"Successfully initialized {len(agents)} agents") except Exception as e: error_msg = f"Agent initialization error: {str(e)}" log_event("error", error_msg) - return {}, None, error_msg - - # Create a SwarmRouter to manage the different swarm types + return None, None, error_msg + + # Swarm-specific configurations router_kwargs = { "name": "multi-agent-workflow", "description": f"Executing {swarm_type} workflow", @@ -240,11 +209,21 @@ async def execute_task(task: str, max_loops: int, data_temp: float, sum_temp: fl "agents": agents, "autosave": True, "return_json": True, - "output_type": "string" + "output_type": "string", + "swarm_type": swarm_type, # Pass swarm_type here } - # Swarm-specific configurations + if swarm_type == "AgentRearrange": + if not flow: + return None, None, "Flow configuration is required for AgentRearrange" + router_kwargs["flow"] = flow + + if swarm_type == "MixtureOfAgents": + if len(agents) < 2: + return None, None, "MixtureOfAgents requires at least 2 agents" + if swarm_type == "SpreadSheetSwarm": + # spread sheet swarm needs specific setup output_dir = "swarm_outputs" os.makedirs(output_dir, exist_ok=True) @@ -262,139 +241,50 @@ async def execute_task(task: str, max_loops: int, data_temp: float, sum_temp: fl except OSError as e: error_msg = f"Invalid output path: {str(e)}" log_event("error", error_msg) - return {}, None, error_msg + return None, None, error_msg - # Create and initialize SpreadSheetSwarm - try: - swarm = SpreadSheetSwarm( - agents=agents, - max_loops=max_loops, - name="spreadsheet-swarm", - description="SpreadSheet processing workflow", - save_file_path=output_path, - workspace_dir=output_dir, - llm=model, - autosave=True - ) - except Exception as e: - error_msg = f"Failed to initialize SpreadSheetSwarm: {str(e)}" - log_event("error", error_msg) - return {}, None, error_msg + router_kwargs["output_path"] = output_path + # Create and execute SwarmRouter + try: + timeout = 450 if swarm_type != "SpreadSheetSwarm" else 900 # SpreadSheetSwarm will have different timeout. + router = SwarmRouter(**router_kwargs) - # Execute the swarm with proper error handling - try: - result = await asyncio.wait_for( - asyncio.to_thread(lambda: swarm.run(task=task)), - timeout=900 # 15 minutes timeout + if swarm_type == "AgentRearrange": + result = await asyncio.wait_for( + asyncio.to_thread(router._run, task), + timeout=timeout ) - + return result, router, "" + + result = await asyncio.wait_for( + asyncio.to_thread(router.run, task), + timeout=timeout + ) + + if swarm_type == "SpreadSheetSwarm": # Verify the output file was created if not os.path.exists(output_path): error_msg = "Output file was not created" log_event("error", error_msg) - return {}, None, error_msg + return None, None, error_msg - return { - "CSV File Path": output_path, - "Status": "Success", - "Message": "Spreadsheet processing completed successfully", - "Result": str(result) - }, swarm, "" + return output_path, router, "" - except asyncio.TimeoutError: - error_msg = "SpreadSheetSwarm execution timed out after 900 seconds" - log_event("error", error_msg) - return {}, None, error_msg - except Exception as e: - error_msg = f"SpreadSheetSwarm execution error: {str(e)}" - log_event("error", error_msg) - return {}, None, error_msg - - # Rest of the swarm type handling... - elif swarm_type == "AgentRearrange": - if not flow: - return {}, None, "Flow configuration is required for AgentRearrange" - router_kwargs["flow"] = flow - - elif swarm_type == "MixtureOfAgents": - if len(agents) < 2: - return {}, None, "MixtureOfAgents requires at least 2 agents" - router_kwargs.update({ - "aggregator_agent": agents[-1], - "layers": max_loops - }) - - # Create router and execute task for non-SpreadSheetSwarm types - if swarm_type != "SpreadSheetSwarm": - try: - timeout = 450 # Default timeout - await asyncio.sleep(0.5) - - router = SwarmRouter(**router_kwargs) - router.swarm_type = swarm_type - - result = await asyncio.wait_for( - asyncio.to_thread(router.run, task=task), - timeout=timeout - ) - - # Process results - def process_result(result): - """Process and standardize the result output.""" - if isinstance(result, str): - try: - # Attempt to parse as JSON - parsed_result = json.loads(result) - return parsed_result - except json.JSONDecodeError: - # Fallback to string result - return {"Final Output": result} - - if isinstance(result, dict): - # Clean and standardize dictionary results - return {str(k): str(v) for k, v in result.items()} - - return {"Final Output": str(result)} - - responses = process_result(result) - return responses, router, "" - - except asyncio.TimeoutError: + return result, router, "" + + except asyncio.TimeoutError: error_msg = f"Task execution timed out after {timeout} seconds" log_event("error", error_msg) - return {}, None, error_msg - except Exception as e: - error_msg = f"Task execution error: {str(e)}" - log_event("error", error_msg) - return {}, None, error_msg + return None, None, error_msg + except Exception as e: + error_msg = f"Task execution error: {str(e)}" + log_event("error", error_msg) + return None, None, error_msg except Exception as e: error_msg = f"Unexpected error in task execution: {str(e)}" log_event("error", error_msg) - return {}, None, error_msg - -def _extract_concurrent_responses(result: str, agents: List[Agent]) -> Dict[str, str]: - """ - Extract unique responses for each agent in a ConcurrentWorkflow. - - Args: - result (str): Full output from SwarmRouter - agents (List[Agent]): List of agents used in the task - - Returns: - Dict[str, str]: Unique responses for each agent - """ - agent_responses = {} - for agent in agents: - # Pattern to capture "Agent Name: ... Response: ... " format - pattern = rf"Agent Name:\s*{re.escape(agent.agent_name)}\s*Response:\s*(.+?)(?=Agent Name:|$)" - - match = re.search(pattern, result, re.DOTALL | re.IGNORECASE | re.MULTILINE) - if match: - agent_responses[agent.agent_name] = match.group(1).strip() - else: - agent_responses[agent.agent_name] = "No response from the Agent" - return agent_responses + return None, None, error_msg class UI: @@ -438,7 +328,7 @@ class UI: choices = ["No options available"] if value is None and choices: value = choices[0] if not multiselect else [choices[0]] - + dropdown = gr.Dropdown( label=label, choices=choices, @@ -448,7 +338,7 @@ class UI: ) self.components[f'dropdown_{label}'] = dropdown return dropdown - + def create_button(self, text, variant="primary"): button = gr.Button(text, variant=variant) self.components[f'button_{text}'] = button @@ -467,20 +357,20 @@ class UI: def create_tab(self, label, content_function): with gr.Tab(label): - content_function(self) + content_function(self) def set_event_listener(self, button, function, inputs, outputs): - button.click(function, inputs=inputs, outputs=outputs) + button.click(function, inputs=inputs, outputs=outputs) def get_components(self, *keys): - if not keys: - return self.components # return all components - return [self.components[key] for key in keys] - + if not keys: + return self.components # return all components + return [self.components[key] for key in keys] + def create_json_output(self, label, placeholder=""): json_output = gr.JSON( label=label, - value = {}, + value={}, elem_classes=["custom-output"], ) self.components[f'json_output_{label}'] = json_output @@ -496,22 +386,22 @@ class UI: inputs=[watch_component], outputs=[component] ) - + @staticmethod def create_ui_theme(primary_color="red"): return gr.themes.Soft( - primary_hue=primary_color, - secondary_hue="gray", - neutral_hue="gray", - ).set( - body_background_fill="#20252c", - body_text_color="#f0f0f0", - button_primary_background_fill=primary_color, - button_primary_text_color="#ffffff", - button_secondary_background_fill=primary_color, - button_secondary_text_color="#ffffff", - shadow_drop="0px 2px 4px rgba(0, 0, 0, 0.3)", - ) + primary_hue=primary_color, + secondary_hue="gray", + neutral_hue="gray", + ).set( + body_background_fill="#20252c", + body_text_color="#f0f0f0", + button_primary_background_fill=primary_color, + button_primary_text_color="#ffffff", + button_secondary_background_fill=primary_color, + button_secondary_text_color="#ffffff", + shadow_drop="0px 2px 4px rgba(0, 0, 0, 0.3)", + ) def create_agent_details_tab(self): """Create the agent details tab content.""" @@ -545,6 +435,7 @@ class UI: ) return logs_display + def create_app(): # Initialize UI theme = UI.create_ui_theme(primary_color="red") @@ -561,7 +452,7 @@ def create_app(): with gr.Column(scale=4): with gr.Row(): task_input = gr.Textbox( - label="Task Description", + label="Task Description", placeholder="Describe your task here...", lines=3 ) @@ -589,7 +480,7 @@ def create_app(): multiselect=False, interactive=True ) - + # Flow configuration components for AgentRearrange with gr.Column(visible=False) as flow_config: flow_text = gr.Textbox( @@ -643,7 +534,7 @@ def create_app(): interactive=False, lines=10 ) - + def update_flow_agents(agent_keys): """Update flow agents based on selected agent prompts.""" if not agent_keys: @@ -666,10 +557,10 @@ def create_app(): is_agent_rearrange = swarm_type == "AgentRearrange" is_mixture = swarm_type == "MixtureOfAgents" is_spreadsheet = swarm_type == "SpreadSheetSwarm" - + max_loops = 5 if is_mixture or is_spreadsheet else 10 log_event("info", f"Swarm type changed to {swarm_type}, max loops set to {max_loops}") - + # Return visibility state for flow configuration and max loops update return ( gr.update(visible=is_agent_rearrange), # For flow_config @@ -677,22 +568,22 @@ def create_app(): f"Selected {swarm_type}" # For loading_status ) - async def run_task_wrapper(task, max_loops, data_temp, swarm_type, agent_prompt_selector, flow_text, sum_temp): + async def run_task_wrapper(task, max_loops, dynamic_temp, swarm_type, agent_prompt_selector, flow_text): """Execute the task and update the UI with progress.""" try: if not task: yield "Please provide a task description.", "Error: Missing task" return - + if not agent_prompt_selector or len(agent_prompt_selector) == 0: yield "Please select at least one agent.", "Error: No agents selected" return - + log_event("info", f"Starting task with agents: {agent_prompt_selector}") - + # Update status yield "Processing...", "Running task..." - + # Prepare flow for AgentRearrange flow = None if swarm_type == "AgentRearrange": @@ -700,49 +591,45 @@ def create_app(): yield "Please provide the agent flow configuration.", "Error: Flow not configured" return flow = flow_text - + # Execute task - responses, router, error = await execute_task( + result, router, error = await execute_task( task=task, max_loops=max_loops, - data_temp=data_temp, - sum_temp=sum_temp, + dynamic_temp=dynamic_temp, swarm_type=swarm_type, agent_keys=agent_prompt_selector, flow=flow ) - + if error: yield f"Error: {error}", "Error occurred" return - + # Format output based on swarm type output_lines = [] - if router.swarm_type == "AgentRearrange": - for key, value in responses.items(): - output_lines.append(f"### Step {key} ###\n{value}\n{'='*50}\n") - elif router.swarm_type == "MixtureOfAgents": - # Add individual agent outputs - for key, value in responses.items(): - if key != "Aggregated Summary": - output_lines.append(f"### {key} ###\n{value}\n") - # Add aggregated summary at the end - if "Aggregated Summary" in responses: - output_lines.append(f"\n### Aggregated Summary ###\n{responses['Aggregated Summary']}\n{'='*50}\n") - elif router.swarm_type == "SpreadSheetSwarm": - output_lines.append(f"### Spreadsheet Output ###\n{responses.get('CSV File Path', 'No file path provided')}\n{'='*50}\n") - elif router.swarm_type == "ConcurrentWorkflow": - for key, value in responses.items(): - output_lines.append(f"### {key} ###\n{value}\n{'='*50}\n") - else: # SequentialWorkflow, auto - if isinstance(responses, dict): - for key, value in responses.items(): - output_lines.append(f"### {key} ###\n{value}\n{'='*50}\n") - else: - output_lines.append(str(responses)) - + if swarm_type == "SpreadSheetSwarm": + output_lines.append(f"### Spreadsheet Output ###\n{result}\n{'=' * 50}\n") + elif isinstance(result, dict): # checking if result is dict or string. + if swarm_type == "AgentRearrange": + for key, value in result.items(): + output_lines.append(f"### Step {key} ###\n{value}\n{'=' * 50}\n") + elif swarm_type == "MixtureOfAgents": + # Add individual agent outputs + for key, value in result.items(): + if key != "Aggregated Summary": + output_lines.append(f"### {key} ###\n{value}\n") + # Add aggregated summary at the end + if "Aggregated Summary" in result: + output_lines.append(f"\n### Aggregated Summary ###\n{result['Aggregated Summary']}\n{'=' * 50}\n") + else: # SequentialWorkflow, ConcurrentWorkflow, Auto + for key, value in result.items(): + output_lines.append(f"### {key} ###\n{value}\n{'=' * 50}\n") + elif isinstance(result, str): + output_lines.append(str(result)) + yield "\n".join(output_lines), "Completed" - + except Exception as e: error_msg = f"Error: {str(e)}" log_event("error", error_msg) @@ -774,7 +661,7 @@ def create_app(): def cancel_task(): log_event("info", "Task cancelled by user") return "Task cancelled.", "Cancelled" - + cancel_button.click( fn=cancel_task, inputs=None, @@ -782,37 +669,26 @@ def create_app(): cancels=run_event ) - # with gr.Column(scale=1): # Right column - # with gr.Tabs(): - # with gr.Tab("Agent Details"): - # gr.Markdown(""" - # ### Available Agent Types - # - **Data Extraction Agent**: Specialized in extracting relevant information - # - **Summary Agent**: Creates concise summaries of information - # - **Analysis Agent**: Performs detailed analysis of data - - with gr.Column(scale=1): # Right column with gr.Tabs(): with gr.Tab("Agent Details"): ui.create_agent_details_tab() - + with gr.Tab("Logs"): logs_display = ui.create_logs_tab() + def update_logs_display(): """Update logs display with current logs.""" logs = get_logs() formatted_logs = "\n".join(logs) return formatted_logs - + # Update logs when tab is selected logs_tab = gr.Tab("Logs") logs_tab.select(fn=update_logs_display, inputs=None, outputs=[logs_display]) - return ui.build() -# Launch the app if __name__ == "__main__": app = create_app() app.launch() \ No newline at end of file