pull/709/head
harshalmore31 4 months ago
parent baa4215083
commit 5cf3219e8e

@ -5,22 +5,10 @@ import json
import time
import asyncio
import gradio as gr
import re
# Import necessary classes and functions from swarms library
from swarms.structs.agent import Agent
from swarms.structs.concurrent_workflow import ConcurrentWorkflow
from swarms.structs.mixture_of_agents import MixtureOfAgents
from swarms.structs.rearrange import AgentRearrange
from swarms.structs.sequential_workflow import SequentialWorkflow
from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm
from swarms.structs.swarm_matcher import swarm_matcher, SwarmMatcher, SwarmMatcherConfig, initialize_swarm_types
from swarms.structs.swarm_router import SwarmRouter
from swarms.utils.loguru_logger import initialize_logger
from groq_model import OpenAIChat # Import OpenAIChat from the correct location
from swarms.utils.file_processing import create_file_in_folder
from doc_master import doc_master
from swarm_models import OpenAIChat
# Initialize logger
@ -33,6 +21,9 @@ load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# changed to swarms_models
# adding functionality to view other models of swarms models
# Model initialization
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
@ -42,21 +33,18 @@ model = OpenAIChat(
)
# Define the path to agent_prompts.json
# locates the json file and then fetches the promopts
PROMPT_JSON_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "agent_prompts.json")
logger.info(f"Loading prompts from: {PROMPT_JSON_PATH}")
# Global log storage
execution_logs = []
def log_event(level: str, message: str, metadata: Optional[Dict] = None):
"""
Log an event and store it in the execution logs.
Args:
level: Log level (info, warning, error, etc.)
message: Log message
metadata: Optional metadata dictionary
"""
def log_event(level: str, message: str, metadata: Optional[Dict] = None):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_entry = {
"timestamp": timestamp,
@ -65,25 +53,12 @@ def log_event(level: str, message: str, metadata: Optional[Dict] = None):
"metadata": metadata or {}
}
execution_logs.append(log_entry)
# Also log to the logger
log_func = getattr(logger, level.lower(), logger.info)
log_func(message)
def get_logs(router: Optional['SwarmRouter'] = None) -> List[str]:
"""
Get formatted logs from both the execution logs and router logs if available.
Args:
router: Optional SwarmRouter instance to get additional logs from
Returns:
List of formatted log strings
"""
formatted_logs = []
# Add execution logs
for log in execution_logs:
metadata_str = ""
if log["metadata"]:
@ -107,12 +82,10 @@ def get_logs(router: Optional['SwarmRouter'] = None) -> List[str]:
def clear_logs():
"""Clear the execution logs."""
execution_logs.clear()
def load_prompts_from_json() -> Dict[str, str]:
"""Robust prompt loading with comprehensive error handling."""
try:
if not os.path.exists(PROMPT_JSON_PATH):
error_msg = f"Prompts file not found at: {PROMPT_JSON_PATH}"
@ -131,20 +104,20 @@ def load_prompts_from_json() -> Dict[str, str]:
error_msg = f"Invalid JSON in prompts file: {str(e)}"
log_event("error", error_msg)
raise
if not isinstance(data, dict):
error_msg = "Prompts file must contain a JSON object"
log_event("error", error_msg)
raise ValueError(error_msg)
prompts = {}
for agent_name, details in data.items():
if not isinstance(details, dict) or "system_prompt" not in details:
log_event("warning", f"Skipping invalid agent config: {agent_name}")
continue
prompts[f"agent.{agent_name}"] = details["system_prompt"]
if not prompts:
error_msg = "No valid prompts found in prompts file"
log_event("error", error_msg)
@ -154,10 +127,10 @@ def load_prompts_from_json() -> Dict[str, str]:
"agent.summarizer": "You are a summarization agent...",
"agent.onboarding_agent": "You are an onboarding agent..."
}
log_event("info", f"Successfully loaded {len(prompts)} prompts from JSON")
return prompts
except Exception as e:
error_msg = f"Error loading prompts: {str(e)}"
log_event("error", error_msg)
@ -169,15 +142,10 @@ def load_prompts_from_json() -> Dict[str, str]:
}
# Load prompts
AGENT_PROMPTS = load_prompts_from_json()
def initialize_agents(
data_temp: float,
sum_temp: float,
agent_keys: List[str]
) -> List[Agent]:
"""Enhanced agent initialization with more robust configuration."""
def initialize_agents(dynamic_temp: float, agent_keys: List[str]) -> List[Agent]:
agents = []
seen_names = set()
for agent_key in agent_keys:
@ -186,7 +154,7 @@ def initialize_agents(
agent_prompt = AGENT_PROMPTS[agent_key]
agent_name = agent_key.split('.')[-1]
# Ensure unique agent names
base_name = agent_name
counter = 1
@ -194,7 +162,7 @@ def initialize_agents(
agent_name = f"{base_name}_{counter}"
counter += 1
seen_names.add(agent_name)
agent = Agent(
agent_name=f"Agent-{agent_name}",
system_prompt=agent_prompt,
@ -207,17 +175,18 @@ def initialize_agents(
user_name="pe_firm",
retry_attempts=1,
context_length=200000,
output_type="string",
temperature=data_temp,
output_type="string", # here is the output type which is string
temperature=dynamic_temp,
)
agents.append(agent)
return agents
async def execute_task(task: str, max_loops: int, data_temp: float, sum_temp: float,
swarm_type: str, agent_keys: List[str], flow: str = None) -> Tuple[Dict[str, str], 'SwarmRouter', str]:
async def execute_task(task: str, max_loops: int, dynamic_temp: float,
swarm_type: str, agent_keys: List[str], flow: str = None) -> Tuple[Any, 'SwarmRouter', str]:
"""
Enhanced task execution with comprehensive error handling and result processing.
Enhanced task execution with comprehensive error handling and raw result return.
"""
start_time = time.time()
log_event("info", f"Starting task execution: {task}")
@ -225,14 +194,14 @@ async def execute_task(task: str, max_loops: int, data_temp: float, sum_temp: fl
try:
# Initialize agents
try:
agents = initialize_agents(data_temp, sum_temp, agent_keys)
agents = initialize_agents(dynamic_temp, agent_keys)
log_event("info", f"Successfully initialized {len(agents)} agents")
except Exception as e:
error_msg = f"Agent initialization error: {str(e)}"
log_event("error", error_msg)
return {}, None, error_msg
# Create a SwarmRouter to manage the different swarm types
return None, None, error_msg
# Swarm-specific configurations
router_kwargs = {
"name": "multi-agent-workflow",
"description": f"Executing {swarm_type} workflow",
@ -240,11 +209,21 @@ async def execute_task(task: str, max_loops: int, data_temp: float, sum_temp: fl
"agents": agents,
"autosave": True,
"return_json": True,
"output_type": "string"
"output_type": "string",
"swarm_type": swarm_type, # Pass swarm_type here
}
# Swarm-specific configurations
if swarm_type == "AgentRearrange":
if not flow:
return None, None, "Flow configuration is required for AgentRearrange"
router_kwargs["flow"] = flow
if swarm_type == "MixtureOfAgents":
if len(agents) < 2:
return None, None, "MixtureOfAgents requires at least 2 agents"
if swarm_type == "SpreadSheetSwarm":
# spread sheet swarm needs specific setup
output_dir = "swarm_outputs"
os.makedirs(output_dir, exist_ok=True)
@ -262,139 +241,50 @@ async def execute_task(task: str, max_loops: int, data_temp: float, sum_temp: fl
except OSError as e:
error_msg = f"Invalid output path: {str(e)}"
log_event("error", error_msg)
return {}, None, error_msg
return None, None, error_msg
# Create and initialize SpreadSheetSwarm
try:
swarm = SpreadSheetSwarm(
agents=agents,
max_loops=max_loops,
name="spreadsheet-swarm",
description="SpreadSheet processing workflow",
save_file_path=output_path,
workspace_dir=output_dir,
llm=model,
autosave=True
)
except Exception as e:
error_msg = f"Failed to initialize SpreadSheetSwarm: {str(e)}"
log_event("error", error_msg)
return {}, None, error_msg
router_kwargs["output_path"] = output_path
# Create and execute SwarmRouter
try:
timeout = 450 if swarm_type != "SpreadSheetSwarm" else 900 # SpreadSheetSwarm will have different timeout.
router = SwarmRouter(**router_kwargs)
# Execute the swarm with proper error handling
try:
result = await asyncio.wait_for(
asyncio.to_thread(lambda: swarm.run(task=task)),
timeout=900 # 15 minutes timeout
if swarm_type == "AgentRearrange":
result = await asyncio.wait_for(
asyncio.to_thread(router._run, task),
timeout=timeout
)
return result, router, ""
result = await asyncio.wait_for(
asyncio.to_thread(router.run, task),
timeout=timeout
)
if swarm_type == "SpreadSheetSwarm":
# Verify the output file was created
if not os.path.exists(output_path):
error_msg = "Output file was not created"
log_event("error", error_msg)
return {}, None, error_msg
return None, None, error_msg
return {
"CSV File Path": output_path,
"Status": "Success",
"Message": "Spreadsheet processing completed successfully",
"Result": str(result)
}, swarm, ""
return output_path, router, ""
except asyncio.TimeoutError:
error_msg = "SpreadSheetSwarm execution timed out after 900 seconds"
log_event("error", error_msg)
return {}, None, error_msg
except Exception as e:
error_msg = f"SpreadSheetSwarm execution error: {str(e)}"
log_event("error", error_msg)
return {}, None, error_msg
# Rest of the swarm type handling...
elif swarm_type == "AgentRearrange":
if not flow:
return {}, None, "Flow configuration is required for AgentRearrange"
router_kwargs["flow"] = flow
elif swarm_type == "MixtureOfAgents":
if len(agents) < 2:
return {}, None, "MixtureOfAgents requires at least 2 agents"
router_kwargs.update({
"aggregator_agent": agents[-1],
"layers": max_loops
})
# Create router and execute task for non-SpreadSheetSwarm types
if swarm_type != "SpreadSheetSwarm":
try:
timeout = 450 # Default timeout
await asyncio.sleep(0.5)
router = SwarmRouter(**router_kwargs)
router.swarm_type = swarm_type
result = await asyncio.wait_for(
asyncio.to_thread(router.run, task=task),
timeout=timeout
)
# Process results
def process_result(result):
"""Process and standardize the result output."""
if isinstance(result, str):
try:
# Attempt to parse as JSON
parsed_result = json.loads(result)
return parsed_result
except json.JSONDecodeError:
# Fallback to string result
return {"Final Output": result}
if isinstance(result, dict):
# Clean and standardize dictionary results
return {str(k): str(v) for k, v in result.items()}
return {"Final Output": str(result)}
responses = process_result(result)
return responses, router, ""
except asyncio.TimeoutError:
return result, router, ""
except asyncio.TimeoutError:
error_msg = f"Task execution timed out after {timeout} seconds"
log_event("error", error_msg)
return {}, None, error_msg
except Exception as e:
error_msg = f"Task execution error: {str(e)}"
log_event("error", error_msg)
return {}, None, error_msg
return None, None, error_msg
except Exception as e:
error_msg = f"Task execution error: {str(e)}"
log_event("error", error_msg)
return None, None, error_msg
except Exception as e:
error_msg = f"Unexpected error in task execution: {str(e)}"
log_event("error", error_msg)
return {}, None, error_msg
def _extract_concurrent_responses(result: str, agents: List[Agent]) -> Dict[str, str]:
"""
Extract unique responses for each agent in a ConcurrentWorkflow.
Args:
result (str): Full output from SwarmRouter
agents (List[Agent]): List of agents used in the task
Returns:
Dict[str, str]: Unique responses for each agent
"""
agent_responses = {}
for agent in agents:
# Pattern to capture "Agent Name: ... Response: ... " format
pattern = rf"Agent Name:\s*{re.escape(agent.agent_name)}\s*Response:\s*(.+?)(?=Agent Name:|$)"
match = re.search(pattern, result, re.DOTALL | re.IGNORECASE | re.MULTILINE)
if match:
agent_responses[agent.agent_name] = match.group(1).strip()
else:
agent_responses[agent.agent_name] = "No response from the Agent"
return agent_responses
return None, None, error_msg
class UI:
@ -438,7 +328,7 @@ class UI:
choices = ["No options available"]
if value is None and choices:
value = choices[0] if not multiselect else [choices[0]]
dropdown = gr.Dropdown(
label=label,
choices=choices,
@ -448,7 +338,7 @@ class UI:
)
self.components[f'dropdown_{label}'] = dropdown
return dropdown
def create_button(self, text, variant="primary"):
button = gr.Button(text, variant=variant)
self.components[f'button_{text}'] = button
@ -467,20 +357,20 @@ class UI:
def create_tab(self, label, content_function):
with gr.Tab(label):
content_function(self)
content_function(self)
def set_event_listener(self, button, function, inputs, outputs):
button.click(function, inputs=inputs, outputs=outputs)
button.click(function, inputs=inputs, outputs=outputs)
def get_components(self, *keys):
if not keys:
return self.components # return all components
return [self.components[key] for key in keys]
if not keys:
return self.components # return all components
return [self.components[key] for key in keys]
def create_json_output(self, label, placeholder=""):
json_output = gr.JSON(
label=label,
value = {},
value={},
elem_classes=["custom-output"],
)
self.components[f'json_output_{label}'] = json_output
@ -496,22 +386,22 @@ class UI:
inputs=[watch_component],
outputs=[component]
)
@staticmethod
def create_ui_theme(primary_color="red"):
return gr.themes.Soft(
primary_hue=primary_color,
secondary_hue="gray",
neutral_hue="gray",
).set(
body_background_fill="#20252c",
body_text_color="#f0f0f0",
button_primary_background_fill=primary_color,
button_primary_text_color="#ffffff",
button_secondary_background_fill=primary_color,
button_secondary_text_color="#ffffff",
shadow_drop="0px 2px 4px rgba(0, 0, 0, 0.3)",
)
primary_hue=primary_color,
secondary_hue="gray",
neutral_hue="gray",
).set(
body_background_fill="#20252c",
body_text_color="#f0f0f0",
button_primary_background_fill=primary_color,
button_primary_text_color="#ffffff",
button_secondary_background_fill=primary_color,
button_secondary_text_color="#ffffff",
shadow_drop="0px 2px 4px rgba(0, 0, 0, 0.3)",
)
def create_agent_details_tab(self):
"""Create the agent details tab content."""
@ -545,6 +435,7 @@ class UI:
)
return logs_display
def create_app():
# Initialize UI
theme = UI.create_ui_theme(primary_color="red")
@ -561,7 +452,7 @@ def create_app():
with gr.Column(scale=4):
with gr.Row():
task_input = gr.Textbox(
label="Task Description",
label="Task Description",
placeholder="Describe your task here...",
lines=3
)
@ -589,7 +480,7 @@ def create_app():
multiselect=False,
interactive=True
)
# Flow configuration components for AgentRearrange
with gr.Column(visible=False) as flow_config:
flow_text = gr.Textbox(
@ -643,7 +534,7 @@ def create_app():
interactive=False,
lines=10
)
def update_flow_agents(agent_keys):
"""Update flow agents based on selected agent prompts."""
if not agent_keys:
@ -666,10 +557,10 @@ def create_app():
is_agent_rearrange = swarm_type == "AgentRearrange"
is_mixture = swarm_type == "MixtureOfAgents"
is_spreadsheet = swarm_type == "SpreadSheetSwarm"
max_loops = 5 if is_mixture or is_spreadsheet else 10
log_event("info", f"Swarm type changed to {swarm_type}, max loops set to {max_loops}")
# Return visibility state for flow configuration and max loops update
return (
gr.update(visible=is_agent_rearrange), # For flow_config
@ -677,22 +568,22 @@ def create_app():
f"Selected {swarm_type}" # For loading_status
)
async def run_task_wrapper(task, max_loops, data_temp, swarm_type, agent_prompt_selector, flow_text, sum_temp):
async def run_task_wrapper(task, max_loops, dynamic_temp, swarm_type, agent_prompt_selector, flow_text):
"""Execute the task and update the UI with progress."""
try:
if not task:
yield "Please provide a task description.", "Error: Missing task"
return
if not agent_prompt_selector or len(agent_prompt_selector) == 0:
yield "Please select at least one agent.", "Error: No agents selected"
return
log_event("info", f"Starting task with agents: {agent_prompt_selector}")
# Update status
yield "Processing...", "Running task..."
# Prepare flow for AgentRearrange
flow = None
if swarm_type == "AgentRearrange":
@ -700,49 +591,45 @@ def create_app():
yield "Please provide the agent flow configuration.", "Error: Flow not configured"
return
flow = flow_text
# Execute task
responses, router, error = await execute_task(
result, router, error = await execute_task(
task=task,
max_loops=max_loops,
data_temp=data_temp,
sum_temp=sum_temp,
dynamic_temp=dynamic_temp,
swarm_type=swarm_type,
agent_keys=agent_prompt_selector,
flow=flow
)
if error:
yield f"Error: {error}", "Error occurred"
return
# Format output based on swarm type
output_lines = []
if router.swarm_type == "AgentRearrange":
for key, value in responses.items():
output_lines.append(f"### Step {key} ###\n{value}\n{'='*50}\n")
elif router.swarm_type == "MixtureOfAgents":
# Add individual agent outputs
for key, value in responses.items():
if key != "Aggregated Summary":
output_lines.append(f"### {key} ###\n{value}\n")
# Add aggregated summary at the end
if "Aggregated Summary" in responses:
output_lines.append(f"\n### Aggregated Summary ###\n{responses['Aggregated Summary']}\n{'='*50}\n")
elif router.swarm_type == "SpreadSheetSwarm":
output_lines.append(f"### Spreadsheet Output ###\n{responses.get('CSV File Path', 'No file path provided')}\n{'='*50}\n")
elif router.swarm_type == "ConcurrentWorkflow":
for key, value in responses.items():
output_lines.append(f"### {key} ###\n{value}\n{'='*50}\n")
else: # SequentialWorkflow, auto
if isinstance(responses, dict):
for key, value in responses.items():
output_lines.append(f"### {key} ###\n{value}\n{'='*50}\n")
else:
output_lines.append(str(responses))
if swarm_type == "SpreadSheetSwarm":
output_lines.append(f"### Spreadsheet Output ###\n{result}\n{'=' * 50}\n")
elif isinstance(result, dict): # checking if result is dict or string.
if swarm_type == "AgentRearrange":
for key, value in result.items():
output_lines.append(f"### Step {key} ###\n{value}\n{'=' * 50}\n")
elif swarm_type == "MixtureOfAgents":
# Add individual agent outputs
for key, value in result.items():
if key != "Aggregated Summary":
output_lines.append(f"### {key} ###\n{value}\n")
# Add aggregated summary at the end
if "Aggregated Summary" in result:
output_lines.append(f"\n### Aggregated Summary ###\n{result['Aggregated Summary']}\n{'=' * 50}\n")
else: # SequentialWorkflow, ConcurrentWorkflow, Auto
for key, value in result.items():
output_lines.append(f"### {key} ###\n{value}\n{'=' * 50}\n")
elif isinstance(result, str):
output_lines.append(str(result))
yield "\n".join(output_lines), "Completed"
except Exception as e:
error_msg = f"Error: {str(e)}"
log_event("error", error_msg)
@ -774,7 +661,7 @@ def create_app():
def cancel_task():
log_event("info", "Task cancelled by user")
return "Task cancelled.", "Cancelled"
cancel_button.click(
fn=cancel_task,
inputs=None,
@ -782,37 +669,26 @@ def create_app():
cancels=run_event
)
# with gr.Column(scale=1): # Right column
# with gr.Tabs():
# with gr.Tab("Agent Details"):
# gr.Markdown("""
# ### Available Agent Types
# - **Data Extraction Agent**: Specialized in extracting relevant information
# - **Summary Agent**: Creates concise summaries of information
# - **Analysis Agent**: Performs detailed analysis of data
with gr.Column(scale=1): # Right column
with gr.Tabs():
with gr.Tab("Agent Details"):
ui.create_agent_details_tab()
with gr.Tab("Logs"):
logs_display = ui.create_logs_tab()
def update_logs_display():
"""Update logs display with current logs."""
logs = get_logs()
formatted_logs = "\n".join(logs)
return formatted_logs
# Update logs when tab is selected
logs_tab = gr.Tab("Logs")
logs_tab.select(fn=update_logs_display, inputs=None, outputs=[logs_display])
return ui.build()
# Launch the app
if __name__ == "__main__":
app = create_app()
app.launch()
Loading…
Cancel
Save