diff --git a/swarms/structs/ui/agent_prompts.json b/swarms/structs/ui/agent_prompts.json index 0a3bd3cc..37f4a510 100644 --- a/swarms/structs/ui/agent_prompts.json +++ b/swarms/structs/ui/agent_prompts.json @@ -46,5 +46,11 @@ }, "Hello": { "system_prompt": "Hi I love coding" + }, + "Tech_bot": { + "system_prompt": "You are an Tech Assistant, your role is to assist the user with tech related information to what he is purchasing laptop,smartphones,etc guide him properly !" + }, + "test": { + "system_prompt": "test" } } \ No newline at end of file diff --git a/swarms/structs/ui/ui.py b/swarms/structs/ui/ui.py index dcfb19b6..bc9f0cc0 100644 --- a/swarms/structs/ui/ui.py +++ b/swarms/structs/ui/ui.py @@ -1,19 +1,19 @@ import os from dotenv import load_dotenv -from typing import List, Dict, Any, Tuple, Optional +from typing import AsyncGenerator, List, Dict, Any, Tuple, Optional import json import time import asyncio import gradio as gr from swarms.structs.agent import Agent from swarms.structs.swarm_router import SwarmRouter -# from swarms.structs.rearrange import AgentRearrange from swarms.utils.loguru_logger import initialize_logger import re import csv # Import the csv module for csv parsing from swarms.utils.litellm_wrapper import LiteLLM from litellm import models_by_provider from dotenv import set_key, find_dotenv +import logging # Import the logging module # Initialize logger load_dotenv() @@ -21,6 +21,7 @@ load_dotenv() # Initialize logger logger = initialize_logger(log_folder="swarm_ui") + # Define the path to agent_prompts.json PROMPT_JSON_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), "agent_prompts.json" @@ -28,7 +29,6 @@ PROMPT_JSON_PATH = os.path.join( logger.info(f"Loading prompts from: {PROMPT_JSON_PATH}") # Load prompts first so its available for create_app - def load_prompts_from_json() -> Dict[str, str]: try: if not os.path.exists(PROMPT_JSON_PATH): @@ -86,8 +86,10 @@ def load_prompts_from_json() -> Dict[str, str]: "Agent-Onboarding_Agent": "You are an onboarding agent...", } + AGENT_PROMPTS = load_prompts_from_json() + def initialize_agents( dynamic_temp: float, agent_keys: List[str], @@ -97,61 +99,78 @@ def initialize_agents( temperature: float, max_tokens: int, ) -> List[Agent]: + logger.info("Initializing agents...") agents = [] seen_names = set() - for agent_key in agent_keys: - if agent_key not in AGENT_PROMPTS: - raise ValueError(f"Invalid agent key: {agent_key}") - - agent_prompt = AGENT_PROMPTS[agent_key] - agent_name = agent_key - - # Ensure unique agent names - base_name = agent_name - counter = 1 - while agent_name in seen_names: - agent_name = f"{base_name}_{counter}" - counter += 1 - seen_names.add(agent_name) - - llm = LiteLLM( - model_name=model_name, - system_prompt=agent_prompt, - temperature=temperature, - max_tokens=max_tokens, - ) - - agent = Agent( - agent_name=agent_name, - system_prompt=agent_prompt, - llm=llm, - max_loops=1, - autosave=True, - verbose=True, - dynamic_temperature_enabled=True, - saved_state_path=f"agent_{agent_name}.json", - user_name="pe_firm", - retry_attempts=1, - context_length=200000, - output_type="string", # here is the output type which is string - temperature=dynamic_temp, - ) - print( - f"Agent created: {agent.agent_name}" - ) # Debug: Print agent name - agents.append(agent) - - return agents + try: + for agent_key in agent_keys: + if agent_key not in AGENT_PROMPTS: + raise ValueError(f"Invalid agent key: {agent_key}") + + agent_prompt = AGENT_PROMPTS[agent_key] + agent_name = agent_key + + # Ensure unique agent names + base_name = agent_name + counter = 1 + while agent_name in seen_names: + agent_name = f"{base_name}_{counter}" + counter += 1 + seen_names.add(agent_name) + + llm = LiteLLM( + model_name=model_name, + system_prompt=agent_prompt, + temperature=temperature, + max_tokens=max_tokens, + ) + + agent = Agent( + agent_name=agent_name, + system_prompt=agent_prompt, + llm=llm, + max_loops=1, + autosave=True, + verbose=True, + dynamic_temperature_enabled=True, + saved_state_path=f"agent_{agent_name}.json", + user_name="pe_firm", + retry_attempts=1, + context_length=200000, + output_type="string", # here is the output type which is string + temperature=dynamic_temp, + ) + print( + f"Agent created: {agent.agent_name}" + ) # Debug: Print agent name + agents.append(agent) + logger.info(f"Agents initialized successfully: {[agent.agent_name for agent in agents]}") + return agents + except Exception as e: + logger.error(f"Error initializing agents: {e}", exc_info=True) + raise def validate_flow(flow, agents_dict): + logger.info(f"Validating flow: {flow}") agent_names = flow.split("->") for agent in agent_names: agent = agent.strip() if agent not in agents_dict: + logger.error(f"Agent '{agent}' specified in the flow does not exist.") raise ValueError( f"Agent '{agent}' specified in the flow does not exist." ) + logger.info(f"Flow validated successfully: {flow}") + +class TaskExecutionError(Exception): + """Custom exception for task execution errors.""" + def __init__(self, message: str): + self.message = message + super().__init__(self.message) + def __str__(self): + return f"TaskExecutionError: {self.message}" + async def execute_task( task: str, max_loops: int, @@ -165,8 +184,32 @@ async def execute_task( temperature: float = 0.5, max_tokens: int = 4000, agents: dict = None, -) -> Tuple[Any, "SwarmRouter", str]: # Changed the return type here + log_display=None, + error_display=None +) -> AsyncGenerator[Tuple[Any, Optional["SwarmRouter"], str], None]: # Changed the return type here + logger.info(f"Executing task: {task} with swarm type: {swarm_type}") try: + if not task: + logger.error("Task description is missing.") + yield "Please provide a task description.", gr.update(visible=True), "" + return + if not agent_keys: + logger.error("No agents selected.") + yield "Please select at least one agent.", gr.update(visible=True), "" + return + if not provider: + logger.error("Provider is missing.") + yield "Please select a provider.", gr.update(visible=True), "" + return + if not model_name: + logger.error("Model is missing.") + yield "Please select a model.", gr.update(visible=True), "" + return + if not api_key: + logger.error("API Key is missing.") + yield "Please enter an API Key.", gr.update(visible=True), "" + return + # Initialize agents try: if not agents: @@ -180,7 +223,9 @@ async def execute_task( max_tokens, ) except Exception as e: - return None, None, str(e) + logger.error(f"Error initializing agents: {e}", exc_info=True) + yield f"Error initializing agents: {e}", gr.update(visible=True), "" + return # Swarm-specific configurations router_kwargs = { @@ -196,11 +241,10 @@ async def execute_task( if swarm_type == "AgentRearrange": if not flow: - return ( - None, - None, - "Flow configuration is required for AgentRearrange", - ) + logger.error("Flow configuration is missing for AgentRearrange.") + yield "Flow configuration is required for AgentRearrange", gr.update(visible=True), "" + return + # Generate unique agent names in the flow flow_agents = [] @@ -217,27 +261,22 @@ async def execute_task( # Update the flow string with unique names flow = " -> ".join(flow_agents) - print(f"Updated Flow string: {flow}") + logger.info(f"Updated Flow string: {flow}") router_kwargs["flow"] = flow router_kwargs["output_type"] = "string" # Changed output type here - - + if swarm_type == "MixtureOfAgents": if len(agents) < 2: - return ( - None, - None, - "MixtureOfAgents requires at least 2 agents", - ) + logger.error("MixtureOfAgents requires at least 2 agents.") + yield "MixtureOfAgents requires at least 2 agents", gr.update(visible=True), "" + return if swarm_type == "SequentialWorkflow": if len(agents) < 2: - return ( - None, - None, - "SequentialWorkflow requires at least 2 agents", - ) + logger.error("SequentialWorkflow requires at least 2 agents.") + yield "SequentialWorkflow requires at least 2 agents", gr.update(visible=True), "" + return if swarm_type == "ConcurrentWorkflow": pass @@ -255,11 +294,6 @@ async def execute_task( ) # SpreadSheetSwarm will have different timeout. if swarm_type == "AgentRearrange": - # result = await asyncio.wait_for( - # asyncio.to_thread(router._run, task), - # timeout=timeout - # ) - # return result, router, "" from swarms.structs.rearrange import AgentRearrange router = AgentRearrange( agents=list(agents.values()), @@ -270,9 +304,11 @@ async def execute_task( # autosave=True, return_json=True, output_type="string", # Changed output type according to agent rearrange - ) + ) result = router(task) # Changed run method - return result, router, "" + logger.info(f"AgentRearrange task executed successfully.") + yield result, None, "" + return # For other swarm types use the SwarmRouter and its run method router = SwarmRouter(**router_kwargs) # Initialize SwarmRouter @@ -290,31 +326,324 @@ async def execute_task( result[agent.agent_name] = response # Convert the result to JSON string for parsing - result = json.dumps(result) - return result, router, "" + result = json.dumps( + { + "input" : { + "swarm_id" : "concurrent_workflow_swarm_id", + "name" : "ConcurrentWorkflow", + "flow" : "->".join([agent.agent_name for agent in list(agents.values())]) + }, + "time" : time.time(), + "outputs" : [ + { + "agent_name": agent_name, + "steps" : [{"role":"assistant", "content":response}] + } for agent_name, response in result.items() + ] + } + ) + logger.info(f"ConcurrentWorkflow task executed successfully.") + yield result, None, "" + return + elif swarm_type == "auto": + result = await asyncio.wait_for( + asyncio.to_thread(router.run, task), + timeout=timeout + ) + if isinstance(result,dict): + result = json.dumps( + { + "input" : { + "swarm_id" : "auto_swarm_id", + "name" : "AutoSwarm", + "flow" : "->".join([agent.agent_name for agent in list(agents.values())]) + }, + "time" : time.time(), + "outputs" : [ + { + "agent_name": agent.agent_name, + "steps" : [{"role":"assistant", "content":response}] + } for agent, response in result.items() + ] + } + ) + elif isinstance(result, str): + result = json.dumps( + { + "input" : { + "swarm_id" : "auto_swarm_id", + "name" : "AutoSwarm", + "flow" : "->".join([agent.agent_name for agent in list(agents.values())]) + }, + "time" : time.time(), + "outputs" : [ + { + "agent_name": "auto", + "steps" : [{"role":"assistant", "content":result}] + } + ] + } + ) + else : + logger.error("Auto Swarm returned an unexpected type") + yield "Error : Auto Swarm returned an unexpected type", gr.update(visible=True), "" + return + logger.info(f"Auto task executed successfully.") + yield result, None, "" + return else: result = await asyncio.wait_for( asyncio.to_thread(router.run, task), timeout=timeout ) - return result, router, "" - except asyncio.TimeoutError: - return ( - None, - None, - f"Task execution timed out after {timeout} seconds", - ) + logger.info(f"{swarm_type} task executed successfully.") + yield result, None, "" + return + except asyncio.TimeoutError as e: + logger.error(f"Task execution timed out after {timeout} seconds", exc_info=True) + yield f"Task execution timed out after {timeout} seconds", gr.update(visible=True), "" + return except Exception as e: - return None, None, str(e) + logger.error(f"Error executing task: {e}", exc_info=True) + yield f"Error executing task: {e}", gr.update(visible=True), "" + return + + except TaskExecutionError as e: + logger.error(f"Task execution error: {e}") + yield str(e), gr.update(visible=True), "" + return + except Exception as e: + logger.error(f"An unexpected error occurred: {e}", exc_info=True) + yield f"An unexpected error occurred: {e}", gr.update(visible=True), "" + return + finally: + logger.info(f"Task execution finished for: {task} with swarm type: {swarm_type}") + + +def format_output(data:Optional[str], swarm_type:str, error_display=None) -> str: + if data is None: + return "Error : No output from the swarm." + if swarm_type == "AgentRearrange": + return parse_agent_rearrange_output(data, error_display) + elif swarm_type == "MixtureOfAgents": + return parse_mixture_of_agents_output(data, error_display) + elif swarm_type in ["SequentialWorkflow", "ConcurrentWorkflow"]: + return parse_sequential_workflow_output(data, error_display) + elif swarm_type == "SpreadSheetSwarm": + if os.path.exists(data): + return parse_spreadsheet_swarm_output(data, error_display) + else: + return parse_json_output(data, error_display) + elif swarm_type == "auto": + return parse_auto_swarm_output(data, error_display) + else: + return "Unsupported swarm type." + +def parse_mixture_of_agents_data(data: dict, error_display=None) -> str: + """Parses the MixtureOfAgents output data and formats it for display.""" + logger.info("Parsing MixtureOfAgents data within Auto Swarm output...") + + try: + output = "" + if "InputConfig" in data and isinstance(data["InputConfig"], dict): + input_config = data["InputConfig"] + output += f"Mixture of Agents Workflow Details\n\n" + output += f"Name: `{input_config.get('name', 'N/A')}`\n" + output += ( + f"Description:" + f" `{input_config.get('description', 'N/A')}`\n\n---\n" + ) + output += f"Agent Task Execution\n\n" + + for agent in input_config.get("agents", []): + output += ( + f"Agent: `{agent.get('agent_name', 'N/A')}`\n" + ) + + if "normal_agent_outputs" in data and isinstance( + data["normal_agent_outputs"], list + ): + for i, agent_output in enumerate( + data["normal_agent_outputs"], start=3 + ): + agent_name = agent_output.get("agent_name", "N/A") + output += f"Run {(3 - i)} (Agent: `{agent_name}`)\n\n" + for j, step in enumerate( + agent_output.get("steps", []), start=3 + ): + if ( + isinstance(step, dict) + and "role" in step + and "content" in step + and step["role"].strip() != "System:" + ): + content = step["content"] + output += f"Step {(3 - j)}: \n" + output += f"Response:\n {content}\n\n" + + if "aggregator_agent_summary" in data: + output += ( + f"\nAggregated Summary :\n" + f"{data['aggregator_agent_summary']}\n{'=' * 50}\n" + ) + + logger.info("MixtureOfAgents data parsed successfully within Auto Swarm.") + return output + + except Exception as e: + logger.error( + f"Error during parsing MixtureOfAgents data within Auto Swarm: {e}", + exc_info=True, + ) + return f"Error during parsing: {str(e)}" + +def parse_auto_swarm_output(data: Optional[str], error_display=None) -> str: + """Parses the auto swarm output string and formats it for display.""" + logger.info("Parsing Auto Swarm output...") + if data is None: + logger.error("No data provided for parsing Auto Swarm output.") + return "Error: No data provided for parsing." + + print(f"Raw data received for parsing:\n{data}") # Debug: Print raw data + + try: + parsed_data = json.loads(data) + errors = [] + + # Basic structure validation + if ( + "input" not in parsed_data + or not isinstance(parsed_data.get("input"), dict) + ): + errors.append( + "Error: 'input' data is missing or not a dictionary." + ) + else: + if "swarm_id" not in parsed_data["input"]: + errors.append( + "Error: 'swarm_id' key is missing in the 'input'." + ) + if "name" not in parsed_data["input"]: + errors.append( + "Error: 'name' key is missing in the 'input'." + ) + if "flow" not in parsed_data["input"]: + errors.append( + "Error: 'flow' key is missing in the 'input'." + ) + + if "time" not in parsed_data: + errors.append("Error: 'time' key is missing.") + + if errors: + logger.error( + f"Errors found while parsing Auto Swarm output: {errors}" + ) + return "\n".join(errors) + + swarm_id = parsed_data["input"]["swarm_id"] + swarm_name = parsed_data["input"]["name"] + agent_flow = parsed_data["input"]["flow"] + overall_time = parsed_data["time"] + + output = f"Workflow Execution Details\n\n" + output += f"Swarm ID: `{swarm_id}`\n" + output += f"Swarm Name: `{swarm_name}`\n" + output += f"Agent Flow: `{agent_flow}`\n\n---\n" + output += f"Agent Task Execution\n\n" + + # Handle nested MixtureOfAgents data + if ( + "outputs" in parsed_data + and isinstance(parsed_data["outputs"], list) + and parsed_data["outputs"] + and isinstance(parsed_data["outputs"][0], dict) + and parsed_data["outputs"][0].get("agent_name") == "auto" + ): + mixture_data = parsed_data["outputs"][0].get("steps", []) + if mixture_data and isinstance(mixture_data[0], dict) and "content" in mixture_data[0]: + try: + mixture_content = json.loads(mixture_data[0]["content"]) + output += parse_mixture_of_agents_data(mixture_content) + except json.JSONDecodeError as e: + logger.error(f"Error decoding nested MixtureOfAgents data: {e}", exc_info=True) + return f"Error decoding nested MixtureOfAgents data: {e}" + else : + for i, agent_output in enumerate(parsed_data["outputs"], start=3): + if not isinstance(agent_output, dict): + errors.append(f"Error: Agent output at index {i} is not a dictionary") + continue + if "agent_name" not in agent_output: + errors.append(f"Error: 'agent_name' key is missing at index {i}") + continue + if "steps" not in agent_output: + errors.append(f"Error: 'steps' key is missing at index {i}") + continue + if agent_output["steps"] is None: + errors.append(f"Error: 'steps' data is None at index {i}") + continue + if not isinstance(agent_output["steps"], list): + errors.append(f"Error: 'steps' data is not a list at index {i}") + continue + + + agent_name = agent_output["agent_name"] + output += f"Run {(3-i)} (Agent: `{agent_name}`)\n\n" + + # Iterate over steps + for j, step in enumerate(agent_output["steps"], start=3): + if not isinstance(step, dict): + errors.append(f"Error: step at index {j} is not a dictionary at {i} agent output.") + continue + if step is None: + errors.append(f"Error: step at index {j} is None at {i} agent output") + continue + + if "role" not in step: + errors.append(f"Error: 'role' key missing at step {j} at {i} agent output.") + continue + + if "content" not in step: + errors.append(f"Error: 'content' key missing at step {j} at {i} agent output.") + continue + + if step["role"].strip() != "System:": # Filter out system prompts + content = step["content"] + output += f"Step {(3-j)}:\n" + output += f"Response : {content}\n\n" + + output += f"Overall Completion Time: `{overall_time}`" + + if errors: + logger.error( + f"Errors found while parsing Auto Swarm output: {errors}" + ) + return "\n".join(errors) + + logger.info("Auto Swarm output parsed successfully.") + return output + + except json.JSONDecodeError as e: + logger.error( + f"Error during parsing Auto Swarm output: {e}", exc_info=True + ) + return f"Error during parsing json.JSONDecodeError: {e}" except Exception as e: - return None, None, str(e) + logger.error( + f"Error during parsing Auto Swarm output: {e}", exc_info=True + ) + return f"Error during parsing: {str(e)}" + -def parse_agent_rearrange_output(data: Optional[str]) -> str: + +def parse_agent_rearrange_output(data: Optional[str], error_display=None) -> str: """ Parses the AgentRearrange output string and formats it for display. """ + logger.info("Parsing AgentRearrange output...") if data is None: + logger.error("No data provided for parsing AgentRearrange output.") return "Error: No data provided for parsing." print( @@ -352,6 +681,7 @@ def parse_agent_rearrange_output(data: Optional[str]) -> str: errors.append("Error: 'time' key is missing.") if errors: + logger.error(f"Errors found while parsing AgentRearrange output: {errors}") return "\n".join(errors) swarm_id = parsed_data["input"]["swarm_id"] @@ -375,6 +705,7 @@ def parse_agent_rearrange_output(data: Optional[str]) -> str: errors.append("Error: 'outputs' list is empty.") if errors: + logger.error(f"Errors found while parsing AgentRearrange output: {errors}") return "\n".join(errors) for i, agent_output in enumerate( @@ -460,20 +791,26 @@ def parse_agent_rearrange_output(data: Optional[str]) -> str: # output += "\n\n---\n" output += f"Overall Completion Time: `{overall_time}`" - if errors: + logger.error(f"Errors found while parsing AgentRearrange output: {errors}") return "\n".join(errors) else: + logger.info("AgentRearrange output parsed successfully.") return output except json.JSONDecodeError as e: + logger.error(f"Error during parsing AgentRearrange output: {e}", exc_info=True) return f"Error during parsing: json.JSONDecodeError {e}" except Exception as e: + logger.error(f"Error during parsing AgentRearrange output: {e}", exc_info=True) return f"Error during parsing: {str(e)}" -def parse_mixture_of_agents_output(data: Optional[str]) -> str: + +def parse_mixture_of_agents_output(data: Optional[str], error_display=None) -> str: """Parses the MixtureOfAgents output string and formats it for display.""" + logger.info("Parsing MixtureOfAgents output...") if data is None: + logger.error("No data provided for parsing MixtureOfAgents output.") return "Error: No data provided for parsing." print(f"Raw data received for parsing:\n{data}") # Debug: Print raw data @@ -482,15 +819,19 @@ def parse_mixture_of_agents_output(data: Optional[str]) -> str: parsed_data = json.loads(data) if "InputConfig" not in parsed_data or not isinstance(parsed_data["InputConfig"], dict): + logger.error("Error: 'InputConfig' data is missing or not a dictionary.") return "Error: 'InputConfig' data is missing or not a dictionary." if "name" not in parsed_data["InputConfig"]: - return "Error: 'name' key is missing in 'InputConfig'." + logger.error("Error: 'name' key is missing in 'InputConfig'.") + return "Error: 'name' key is missing in 'InputConfig'." if "description" not in parsed_data["InputConfig"]: + logger.error("Error: 'description' key is missing in 'InputConfig'.") return "Error: 'description' key is missing in 'InputConfig'." if "agents" not in parsed_data["InputConfig"] or not isinstance(parsed_data["InputConfig"]["agents"], list) : - return "Error: 'agents' key is missing in 'InputConfig' or not a list." + logger.error("Error: 'agents' key is missing in 'InputConfig' or not a list.") + return "Error: 'agents' key is missing in 'InputConfig' or not a list." name = parsed_data["InputConfig"]["name"] @@ -503,11 +844,14 @@ def parse_mixture_of_agents_output(data: Optional[str]) -> str: for agent in parsed_data["InputConfig"]["agents"]: if not isinstance(agent, dict): + logger.error("Error: agent is not a dict in InputConfig agents") return "Error: agent is not a dict in InputConfig agents" if "agent_name" not in agent: + logger.error("Error: 'agent_name' key is missing in agents.") return "Error: 'agent_name' key is missing in agents." if "system_prompt" not in agent: + logger.error("Error: 'system_prompt' key is missing in agents.") return f"Error: 'system_prompt' key is missing in agents." agent_name = agent["agent_name"] @@ -516,19 +860,25 @@ def parse_mixture_of_agents_output(data: Optional[str]) -> str: # output += f"* **System Prompt:** `{system_prompt}`\n\n" if "normal_agent_outputs" not in parsed_data or not isinstance(parsed_data["normal_agent_outputs"], list) : + logger.error("Error: 'normal_agent_outputs' key is missing or not a list.") return "Error: 'normal_agent_outputs' key is missing or not a list." for i, agent_output in enumerate(parsed_data["normal_agent_outputs"], start=3): if not isinstance(agent_output, dict): + logger.error(f"Error: agent output at index {i} is not a dictionary.") return f"Error: agent output at index {i} is not a dictionary." if "agent_name" not in agent_output: - return f"Error: 'agent_name' key is missing at index {i}" + logger.error(f"Error: 'agent_name' key is missing at index {i}") + return f"Error: 'agent_name' key is missing at index {i}" if "steps" not in agent_output: + logger.error(f"Error: 'steps' key is missing at index {i}") return f"Error: 'steps' key is missing at index {i}" if agent_output["steps"] is None: + logger.error(f"Error: 'steps' is None at index {i}") return f"Error: 'steps' is None at index {i}" if not isinstance(agent_output["steps"], list): + logger.error(f"Error: 'steps' data is not a list at index {i}.") return f"Error: 'steps' data is not a list at index {i}." agent_name = agent_output["agent_name"] @@ -536,16 +886,20 @@ def parse_mixture_of_agents_output(data: Optional[str]) -> str: # output += "
\nShow/Hide Agent Steps\n\n" for j, step in enumerate(agent_output["steps"], start=3): if not isinstance(step, dict): - return f"Error: step at index {j} is not a dictionary at {i} agent output." + logger.error(f"Error: step at index {j} is not a dictionary at {i} agent output.") + return f"Error: step at index {j} is not a dictionary at {i} agent output." if step is None: + logger.error(f"Error: step at index {j} is None at {i} agent output.") return f"Error: step at index {j} is None at {i} agent output." if "role" not in step: - return f"Error: 'role' key missing at step {j} at {i} agent output." + logger.error(f"Error: 'role' key missing at step {j} at {i} agent output.") + return f"Error: 'role' key missing at step {j} at {i} agent output." if "content" not in step: - return f"Error: 'content' key missing at step {j} at {i} agent output." + logger.error(f"Error: 'content' key missing at step {j} at {i} agent output.") + return f"Error: 'content' key missing at step {j} at {i} agent output." if step["role"].strip() != "System:": # Filter out system prompts # role = step["role"] @@ -557,19 +911,24 @@ def parse_mixture_of_agents_output(data: Optional[str]) -> str: if "aggregator_agent_summary" in parsed_data: output += f"\nAggregated Summary :\n{parsed_data['aggregator_agent_summary']}\n{'=' * 50}\n" - + logger.info("MixtureOfAgents output parsed successfully.") return output except json.JSONDecodeError as e: + logger.error(f"Error during parsing MixtureOfAgents output: {e}", exc_info=True) return f"Error during parsing json.JSONDecodeError : {e}" except Exception as e: + logger.error(f"Error during parsing MixtureOfAgents output: {e}", exc_info=True) return f"Error during parsing: {str(e)}" -def parse_sequential_workflow_output(data: Optional[str]) -> str: + +def parse_sequential_workflow_output(data: Optional[str], error_display=None) -> str: """Parses the SequentialWorkflow output string and formats it for display.""" + logger.info("Parsing SequentialWorkflow output...") if data is None: - return "Error: No data provided for parsing." + logger.error("No data provided for parsing SequentialWorkflow output.") + return "Error: No data provided for parsing." print(f"Raw data received for parsing:\n{data}") # Debug: Print raw data @@ -577,19 +936,24 @@ def parse_sequential_workflow_output(data: Optional[str]) -> str: parsed_data = json.loads(data) if "input" not in parsed_data or not isinstance(parsed_data.get("input"), dict): + logger.error("Error: 'input' data is missing or not a dictionary.") return "Error: 'input' data is missing or not a dictionary." if "swarm_id" not in parsed_data["input"] : - return "Error: 'swarm_id' key is missing in the 'input'." + logger.error("Error: 'swarm_id' key is missing in the 'input'.") + return "Error: 'swarm_id' key is missing in the 'input'." if "name" not in parsed_data["input"]: + logger.error("Error: 'name' key is missing in the 'input'.") return "Error: 'name' key is missing in the 'input'." if "flow" not in parsed_data["input"]: - return "Error: 'flow' key is missing in the 'input'." + logger.error("Error: 'flow' key is missing in the 'input'.") + return "Error: 'flow' key is missing in the 'input'." if "time" not in parsed_data : - return "Error: 'time' key is missing." + logger.error("Error: 'time' key is missing.") + return "Error: 'time' key is missing." swarm_id = parsed_data["input"]["swarm_id"] swarm_name = parsed_data["input"]["name"] @@ -603,29 +967,37 @@ def parse_sequential_workflow_output(data: Optional[str]) -> str: output += f"Agent Task Execution\n\n" if "outputs" not in parsed_data: - return "Error: 'outputs' key is missing" + logger.error("Error: 'outputs' key is missing") + return "Error: 'outputs' key is missing" if parsed_data["outputs"] is None: + logger.error("Error: 'outputs' data is None") return "Error: 'outputs' data is None" if not isinstance(parsed_data["outputs"], list): + logger.error("Error: 'outputs' data is not a list.") return "Error: 'outputs' data is not a list." for i, agent_output in enumerate(parsed_data["outputs"], start=3): if not isinstance(agent_output, dict): - return f"Error: Agent output at index {i} is not a dictionary" + logger.error(f"Error: Agent output at index {i} is not a dictionary") + return f"Error: Agent output at index {i} is not a dictionary" if "agent_name" not in agent_output: + logger.error(f"Error: 'agent_name' key is missing at index {i}") return f"Error: 'agent_name' key is missing at index {i}" if "steps" not in agent_output: - return f"Error: 'steps' key is missing at index {i}" + logger.error(f"Error: 'steps' key is missing at index {i}") + return f"Error: 'steps' key is missing at index {i}" if agent_output["steps"] is None: - return f"Error: 'steps' data is None at index {i}" + logger.error(f"Error: 'steps' data is None at index {i}") + return f"Error: 'steps' data is None at index {i}" if not isinstance(agent_output["steps"], list): - return f"Error: 'steps' data is not a list at index {i}" + logger.error(f"Error: 'steps' data is not a list at index {i}") + return f"Error: 'steps' data is not a list at index {i}" agent_name = agent_output["agent_name"] output += f"Run {(3-i)} (Agent: `{agent_name}`)\n\n" @@ -634,15 +1006,19 @@ def parse_sequential_workflow_output(data: Optional[str]) -> str: # Iterate over steps for j, step in enumerate(agent_output["steps"], start=3): if not isinstance(step, dict): + logger.error(f"Error: step at index {j} is not a dictionary at {i} agent output.") return f"Error: step at index {j} is not a dictionary at {i} agent output." if step is None: + logger.error(f"Error: step at index {j} is None at {i} agent output") return f"Error: step at index {j} is None at {i} agent output" if "role" not in step: + logger.error(f"Error: 'role' key missing at step {j} at {i} agent output.") return f"Error: 'role' key missing at step {j} at {i} agent output." if "content" not in step: + logger.error(f"Error: 'content' key missing at step {j} at {i} agent output.") return f"Error: 'content' key missing at step {j} at {i} agent output." if step["role"].strip() != "System:": # Filter out system prompts @@ -654,17 +1030,22 @@ def parse_sequential_workflow_output(data: Optional[str]) -> str: # output += "
\n\n---\n" output += f"Overall Completion Time: `{overall_time}`" + logger.info("SequentialWorkflow output parsed successfully.") return output except json.JSONDecodeError as e : + logger.error(f"Error during parsing SequentialWorkflow output: {e}", exc_info=True) return f"Error during parsing json.JSONDecodeError : {e}" except Exception as e: + logger.error(f"Error during parsing SequentialWorkflow output: {e}", exc_info=True) return f"Error during parsing: {str(e)}" -def parse_spreadsheet_swarm_output(file_path: str) -> str: +def parse_spreadsheet_swarm_output(file_path: str, error_display=None) -> str: """Parses the SpreadSheetSwarm output CSV file and formats it for display.""" + logger.info("Parsing SpreadSheetSwarm output...") if not file_path: + logger.error("No file path provided for parsing SpreadSheetSwarm output.") return "Error: No file path provided for parsing." print(f"Parsing spreadsheet output from: {file_path}") @@ -674,7 +1055,8 @@ def parse_spreadsheet_swarm_output(file_path: str) -> str: csv_reader = csv.reader(file) header = next(csv_reader, None) # Read the header row if not header: - return "Error: CSV file is empty or has no header" + logger.error("CSV file is empty or has no header.") + return "Error: CSV file is empty or has no header" output = "### Spreadsheet Swarm Output ###\n\n" output += "| " + " | ".join(header) + " |\n" # Adding header @@ -684,15 +1066,20 @@ def parse_spreadsheet_swarm_output(file_path: str) -> str: output += "| " + " | ".join(row) + " |\n" # Adding row output += "\n" + logger.info("SpreadSheetSwarm output parsed successfully.") return output - except FileNotFoundError: + except FileNotFoundError as e: + logger.error(f"Error during parsing SpreadSheetSwarm output: {e}", exc_info=True) return "Error: CSV file not found." except Exception as e: + logger.error(f"Error during parsing SpreadSheetSwarm output: {e}", exc_info=True) return f"Error during parsing CSV file: {str(e)}" -def parse_json_output(data:str) -> str: +def parse_json_output(data:str, error_display=None) -> str: """Parses a JSON string and formats it for display.""" + logger.info("Parsing JSON output...") if not data: + logger.error("No data provided for parsing JSON output.") return "Error: No data provided for parsing." print(f"Parsing json output from: {data}") @@ -716,13 +1103,15 @@ def parse_json_output(data:str) -> str: else : output += f"**{key}**: {value}\n" - + logger.info("JSON output parsed successfully.") return output except json.JSONDecodeError as e: - return f"Error: Invalid JSON format - {e}" + logger.error(f"Error during parsing JSON output: {e}", exc_info=True) + return f"Error: Invalid JSON format - {e}" except Exception as e: + logger.error(f"Error during parsing JSON output: {e}", exc_info=True) return f"Error during JSON parsing: {str(e)}" class UI: @@ -885,7 +1274,6 @@ class UI: lines=10, ) return logs_display - def update_flow_agents(agent_keys): """Update flow agents based on selected agent prompts.""" if not agent_keys: @@ -914,8 +1302,6 @@ def create_app(): "gemini", "mistral", "groq", - "nvidia_nim", - "huggingface", "perplexity", ] @@ -1067,9 +1453,6 @@ def create_app(): create_agent_button = gr.Button( "Save New Prompt" ) - generate_agent_button = gr.Button( - "Generate_Final_p" - ) with gr.Column(): create_agent_status = gr.Textbox( label="Status", @@ -1114,7 +1497,21 @@ def create_app(): interactive=False, lines=10, ) - + with gr.Row(): + log_display = gr.Textbox( + label="Logs", + placeholder="Logs will be displayed here...", + interactive=False, + lines=5, + visible=False, + ) + error_display = gr.Textbox( + label="Error", + placeholder="Errors will be displayed here...", + interactive=False, + lines=5, + visible=False, + ) def update_agent_dropdown(): """Update agent dropdown when a new agent is added""" global AGENT_PROMPTS @@ -1153,8 +1550,6 @@ def create_app(): def update_model_dropdown(provider): """Update model dropdown based on selected provider.""" models = filtered_models.get(provider, []) - if provider == "huggingface": - models = [f"huggingface/{model}" for model in models] return gr.update( choices=models, value=models[0] if models else None, @@ -1216,53 +1611,9 @@ def create_app(): ): """Execute the task and update the UI with progress.""" try: - if not task: - yield ( - "Please provide a task description.", - "Error: Missing task", - "", - ) - return - - if ( - not agent_prompt_selector - or len(agent_prompt_selector) == 0 - ): - yield ( - "Please select at least one agent.", - "Error: No agents selected", - "", - ) - return - - if not provider: - yield ( - "Please select a provider.", - "Error: No provider selected", - "", - ) - return - - if not model_name: - yield ( - "Please select a model.", - "Error: No model selected", - "", - ) - if provider == "huggingface": - model_name = f"huggingface/{model_name}" - return - - if not api_key: - yield ( - "Please enter an API Key", - "Error: API Key is required", - "", - ) - return - # Update status - yield "Processing...", "Running task...", "" + yield "Processing...", "Running task...", "", gr.update(visible=False), gr.update(visible=False) + # Prepare flow for AgentRearrange flow = None @@ -1273,6 +1624,8 @@ def create_app(): " configuration.", "Error: Flow not configured", "", + gr.update(visible=True), + gr.update(visible=False) ) return flow = flow_text @@ -1297,14 +1650,6 @@ def create_app(): set_key(env_path, "MISTRAL_API_KEY", api_key) elif provider == "groq": set_key(env_path, "GROQ_API_KEY", api_key) - elif provider == "nvidia_nim": - set_key( - env_path, "NVIDIA_NIM_API_KEY", api_key - ) - elif provider == "huggingface": - set_key( - env_path, "HUGGINGFACE_API_KEY", api_key - ) elif provider == "perplexity": set_key( env_path, "PERPLEXITY_API_KEY", api_key @@ -1315,6 +1660,8 @@ def create_app(): " present", f"Error: {provider} not supported", "", + gr.update(visible=True), + gr.update(visible=False) ) return @@ -1338,7 +1685,7 @@ def create_app(): } # Execute task - result, router, error = await execute_task( + async for result, router, error in execute_task( task=task, max_loops=max_loops, dynamic_temp=dynamic_temp, @@ -1351,77 +1698,19 @@ def create_app(): temperature=temperature, max_tokens=max_tokens, agents=agents_dict, # Changed here - ) - - if error: - yield f"Error: {error}", f"Error: {error}", "" - return - - if result is None: - yield "Error: No output from swarm.", f"Error: {error}", "" - return - - # Format output based on swarm type - output_lines = [] - if swarm_type == "SpreadSheetSwarm": - # Check if the result is a file path or JSON - if os.path.exists(result): - parsed_output = ( - parse_spreadsheet_swarm_output(result) - ) - output_lines.append(parsed_output) - else: - parsed_output = parse_json_output(result) - output_lines.append(parsed_output) - - elif swarm_type == "AgentRearrange": - try: - parsed_output = ( - parse_agent_rearrange_output(result) - ) - except ValueError as e: - parsed_output = ( - f"Error during parsing: {e}" - ) # Handle ValueError - output_lines.append(parsed_output) - elif swarm_type == "MixtureOfAgents": - parsed_output = ( - parse_mixture_of_agents_output(result) - ) - output_lines.append(parsed_output) - elif swarm_type == "SequentialWorkflow": - parsed_output = ( - parse_sequential_workflow_output(result) - ) - output_lines.append(parsed_output) - elif isinstance( - result, dict - ): # checking if result is dict or string. - if swarm_type == "MixtureOfAgents": - # Add individual agent outputs - for key, value in result.items(): - if key != "Aggregated Summary": - output_lines.append( - f"### {key} ###\n{value}\n" - ) - # Add aggregated summary at the end - if "Aggregated Summary" in result: - output_lines.append( - "\n### Aggregated Summary" - f" ###\n{result['Aggregated Summary']}\n{'=' * 50}\n" - ) - else: # SequentialWorkflow, ConcurrentWorkflow, Auto - for key, value in result.items(): - output_lines.append( - f"### {key} ###\n{value}\n{'=' * 50}\n" - ) - elif isinstance(result, str): - output_lines.append(str(result)) - - yield "\n".join(output_lines), "Completed", api_key - + log_display=log_display, + error_display = error_display + ): + if error: + yield f"Error: {error}", f"Error: {error}", "", gr.update(visible=True), gr.update(visible=True) + return + if result is not None: + formatted_output = format_output(result, swarm_type, error_display) + yield formatted_output, "Completed", api_key, gr.update(visible=False), gr.update(visible=False) + return except Exception as e: - yield f"Error: {str(e)}", f"Error: {str(e)}", "" + yield f"Error: {str(e)}", f"Error: {str(e)}", "", gr.update(visible=True), gr.update(visible=True) + return # Connect the update functions agent_selector.change( @@ -1469,12 +1758,14 @@ def create_app(): agent_output_display, loading_status, env_api_key_textbox, + error_display, + log_display, ], ) # Connect cancel button to interrupt processing def cancel_task(): - return "Task cancelled.", "Cancelled", "" + return "Task cancelled.", "Cancelled", "", gr.update(visible=False), gr.update(visible=False) cancel_button.click( fn=cancel_task, @@ -1483,6 +1774,8 @@ def create_app(): agent_output_display, loading_status, env_api_key_textbox, + error_display, + log_display ], cancels=run_event, )