fix swarm router and now sequential workflow

master
Kye Gomez 2 days ago
parent bcd8a31417
commit 3c0298fe3e

@ -35,10 +35,9 @@ agent = Agent(
You communicate in precise, technical terms while maintaining clarity for stakeholders.""", You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
max_loops=1, max_loops=1,
model_name="gpt-4o-mini", model_name="claude-3-sonnet-20240229",
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
output_type="all", output_type="all",
max_tokens=16384,
# dashboard=True # dashboard=True
) )

@ -0,0 +1,74 @@
from swarms import Agent, SequentialWorkflow
import litellm
litellm._turn_on_debug()
# Initialize market research agent
market_researcher = Agent(
agent_name="Market-Researcher",
system_prompt="""You are a market research specialist. Your tasks include:
1. Analyzing market trends and patterns
2. Identifying market opportunities and threats
3. Evaluating competitor strategies
4. Assessing customer needs and preferences
5. Providing actionable market insights""",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
)
# Initialize financial analyst agent
financial_analyst = Agent(
agent_name="Financial-Analyst",
system_prompt="""You are a financial analysis expert. Your responsibilities include:
1. Analyzing financial statements
2. Evaluating investment opportunities
3. Assessing risk factors
4. Providing financial forecasts
5. Recommending financial strategies""",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
)
# Initialize technical analyst agent
technical_analyst = Agent(
agent_name="Technical-Analyst",
system_prompt="""You are a technical analysis specialist. Your focus areas include:
1. Analyzing price patterns and trends
2. Evaluating technical indicators
3. Identifying support and resistance levels
4. Assessing market momentum
5. Providing trading recommendations""",
model_name="claude-3-sonnet-20240229",
max_loops=1,
temperature=0.7,
)
# Create list of agents
agents = [market_researcher, financial_analyst, technical_analyst]
# # Initialize the concurrent workflow
# workflow = ConcurrentWorkflow(
# name="market-analysis-workflow",
# agents=agents,
# max_loops=1,
# )
# # Run the workflow
# result = workflow.run(
# "Analyze Tesla (TSLA) stock from market, financial, and technical perspectives"
# )
router = SequentialWorkflow(
name="market-analysis-router",
agents=agents,
max_loops=1,
# output_type="all",
)
result = router.run(
"Analyze Tesla (TSLA) stock from market, financial, and technical perspectives"
)
print(result)

@ -0,0 +1,68 @@
from swarms import Agent, SwarmRouter
# Initialize market research agent
market_researcher = Agent(
agent_name="Market-Researcher",
system_prompt="""You are a market research specialist. Your tasks include:
1. Analyzing market trends and patterns
2. Identifying market opportunities and threats
3. Evaluating competitor strategies
4. Assessing customer needs and preferences
5. Providing actionable market insights""",
model_name="claude-sonnet-4-20250514",
max_loops=1,
)
# Initialize financial analyst agent
financial_analyst = Agent(
agent_name="Financial-Analyst",
system_prompt="""You are a financial analysis expert. Your responsibilities include:
1. Analyzing financial statements
2. Evaluating investment opportunities
3. Assessing risk factors
4. Providing financial forecasts
5. Recommending financial strategies""",
model_name="claude-sonnet-4-20250514",
max_loops=1,
)
# Initialize technical analyst agent
technical_analyst = Agent(
agent_name="Technical-Analyst",
system_prompt="""You are a technical analysis specialist. Your focus areas include:
1. Analyzing price patterns and trends
2. Evaluating technical indicators
3. Identifying support and resistance levels
4. Assessing market momentum
5. Providing trading recommendations""",
model_name="claude-sonnet-4-20250514",
max_loops=1,
)
# Create list of agents
agents = [market_researcher, financial_analyst, technical_analyst]
# # Initialize the concurrent workflow
# workflow = ConcurrentWorkflow(
# name="market-analysis-workflow",
# agents=agents,
# max_loops=1,
# )
# # Run the workflow
# result = workflow.run(
# "Analyze Tesla (TSLA) stock from market, financial, and technical perspectives"
# )
router = SwarmRouter(
name="market-analysis-router",
swarm_type="ConcurrentWorkflow",
agents=agents,
max_loops=1,
# output_type="all",
)
result = router.run(
"Analyze Tesla (TSLA) stock from market, financial, and technical perspectives"
)
print(result)

@ -161,7 +161,6 @@ class AgentToolExecutionError(AgentError):
pass pass
# [FEAT][AGENT]
class Agent: class Agent:
""" """
Agent is the backbone to connect LLMs with tools and long term memory. Agent also provides the ability to Agent is the backbone to connect LLMs with tools and long term memory. Agent also provides the ability to
@ -1152,12 +1151,7 @@ class Agent:
self.save() self.save()
logger.error( logger.error(
f"Attempt {attempt+1}/{self.max_retries}: Error generating response in loop {loop_count} for agent '{self.agent_name}': {str(e)} | " f"Attempt {attempt+1}/{self.retry_attempts}: Error generating response in loop {loop_count} for agent '{self.agent_name}': {str(e)} | "
f"Error type: {type(e).__name__}, Error details: {e.__dict__ if hasattr(e, '__dict__') else 'No additional details'} | "
f"Current task: '{task}', Agent state: max_loops={self.max_loops}, "
f"model={getattr(self.llm, 'model_name', 'unknown')}, "
f"temperature={getattr(self.llm, 'temperature', 'unknown')}"
f"{f' | Traceback: {e.__traceback__}' if hasattr(e, '__traceback__') else ''}"
) )
attempt += 1 attempt += 1

@ -1,4 +1,3 @@
import asyncio
import json import json
import uuid import uuid
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
@ -344,7 +343,8 @@ class AgentRearrange(BaseSwarm):
logger.info("Task execution completed") logger.info("Task execution completed")
return history_output_formatter( return history_output_formatter(
self.conversation, self.output_type conversation=self.conversation,
type=self.output_type,
) )
except Exception as e: except Exception as e:
@ -364,11 +364,6 @@ class AgentRearrange(BaseSwarm):
self, self,
task: str = None, task: str = None,
img: str = None, img: str = None,
device: str = "cpu",
device_id: int = 2,
all_cores: bool = True,
all_gpus: bool = False,
no_use_clusterops: bool = True,
*args, *args,
**kwargs, **kwargs,
): ):
@ -481,58 +476,11 @@ class AgentRearrange(BaseSwarm):
except Exception as e: except Exception as e:
self._catch_error(e) self._catch_error(e)
async def abatch_run(
self,
tasks: List[str],
img: Optional[List[str]] = None,
batch_size: int = 10,
*args,
**kwargs,
) -> List[str]:
"""
Asynchronously process multiple tasks in batches.
Args:
tasks: List of tasks to process
img: Optional list of images corresponding to tasks
batch_size: Number of tasks to process simultaneously
Returns:
List of results corresponding to input tasks
"""
try:
results = []
for i in range(0, len(tasks), batch_size):
batch_tasks = tasks[i : i + batch_size]
batch_imgs = (
img[i : i + batch_size]
if img
else [None] * len(batch_tasks)
)
# Process batch using asyncio.gather
batch_coros = [
self.astream(
task=task, img=img_path, *args, **kwargs
)
for task, img_path in zip(batch_tasks, batch_imgs)
]
batch_results = await asyncio.gather(*batch_coros)
results.extend(batch_results)
return results
except Exception as e:
self._catch_error(e)
def concurrent_run( def concurrent_run(
self, self,
tasks: List[str], tasks: List[str],
img: Optional[List[str]] = None, img: Optional[List[str]] = None,
max_workers: Optional[int] = None, max_workers: Optional[int] = None,
device: str = "cpu",
device_id: int = None,
all_cores: bool = True,
all_gpus: bool = False,
*args, *args,
**kwargs, **kwargs,
) -> List[str]: ) -> List[str]:
@ -561,10 +509,6 @@ class AgentRearrange(BaseSwarm):
self.run, self.run,
task=task, task=task,
img=img_path, img=img_path,
device=device,
device_id=device_id,
all_cores=all_cores,
all_gpus=all_gpus,
*args, *args,
**kwargs, **kwargs,
) )

@ -49,15 +49,12 @@ class SequentialWorkflow:
self.flow = self.sequential_flow() self.flow = self.sequential_flow()
self.agent_rearrange = AgentRearrange( self.agent_rearrange = AgentRearrange(
name=name, name=self.name,
description=description, description=self.description,
agents=agents, agents=self.agents,
flow=self.flow, flow=self.flow,
max_loops=max_loops, max_loops=self.max_loops,
output_type=output_type, output_type=self.output_type,
shared_memory_system=shared_memory_system,
*args,
**kwargs,
) )
def sequential_flow(self): def sequential_flow(self):
@ -105,11 +102,7 @@ class SequentialWorkflow:
self, self,
task: str, task: str,
img: Optional[str] = None, img: Optional[str] = None,
device: str = "cpu", imgs: Optional[List[str]] = None,
all_cores: bool = False,
all_gpus: bool = False,
device_id: int = 0,
no_use_clusterops: bool = True,
*args, *args,
**kwargs, **kwargs,
): ):
@ -134,14 +127,14 @@ class SequentialWorkflow:
""" """
try: try:
result = self.agent_rearrange.run( return self.agent_rearrange.run(
task=task, task=task,
img=img, img=img,
*args, # imgs=imgs,
**kwargs, # *args,
# **kwargs,
) )
return result
except Exception as e: except Exception as e:
logger.error( logger.error(
f"An error occurred while executing the task: {e}" f"An error occurred while executing the task: {e}"

@ -503,7 +503,12 @@ class SwarmRouter:
""" """
self.swarm = self._create_swarm(task, *args, **kwargs) self.swarm = self._create_swarm(task, *args, **kwargs)
self.conversation = self.swarm.conversation if self.swarm_type == "SequentialWorkflow":
self.conversation = (
self.swarm.agent_rearrange.conversation
)
else:
self.conversation = self.swarm.conversation
if self.list_all_agents is True: if self.list_all_agents is True:
list_all_agents( list_all_agents(

Loading…
Cancel
Save