parent
bd0a89705f
commit
0ea39bb905
@ -0,0 +1,62 @@
|
|||||||
|
import asyncio
|
||||||
|
from typing import Any, Callable, List, Optional
|
||||||
|
from swarms.structs.base_workflow import BaseWorkflow
|
||||||
|
from swarms.structs.agent import Agent
|
||||||
|
from swarms.utils.loguru_logger import logger
|
||||||
|
|
||||||
|
class AsyncWorkflow(BaseWorkflow):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
name: str = "AsyncWorkflow",
|
||||||
|
agents: List[Agent] = None,
|
||||||
|
max_workers: int = 5,
|
||||||
|
dashboard: bool = False,
|
||||||
|
autosave: bool = False,
|
||||||
|
verbose: bool = False,
|
||||||
|
**kwargs
|
||||||
|
):
|
||||||
|
super().__init__(agents=agents, **kwargs)
|
||||||
|
self.name = name
|
||||||
|
self.agents = agents or []
|
||||||
|
self.max_workers = max_workers
|
||||||
|
self.dashboard = dashboard
|
||||||
|
self.autosave = autosave
|
||||||
|
self.verbose = verbose
|
||||||
|
self.task_pool = []
|
||||||
|
self.results = []
|
||||||
|
self.loop = None
|
||||||
|
|
||||||
|
async def _execute_agent_task(self, agent: Agent, task: str) -> Any:
|
||||||
|
"""Execute a single agent task asynchronously"""
|
||||||
|
try:
|
||||||
|
if self.verbose:
|
||||||
|
logger.info(f"Agent {agent.agent_name} processing task: {task}")
|
||||||
|
result = await agent.arun(task)
|
||||||
|
if self.verbose:
|
||||||
|
logger.info(f"Agent {agent.agent_name} completed task")
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in agent {agent.agent_name}: {str(e)}")
|
||||||
|
return str(e)
|
||||||
|
|
||||||
|
async def run(self, task: str) -> List[Any]:
|
||||||
|
"""Run the workflow with all agents processing the task concurrently"""
|
||||||
|
if not self.agents:
|
||||||
|
raise ValueError("No agents provided to the workflow")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Create tasks for all agents
|
||||||
|
tasks = [self._execute_agent_task(agent, task) for agent in self.agents]
|
||||||
|
|
||||||
|
# Execute all tasks concurrently
|
||||||
|
self.results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
if self.autosave:
|
||||||
|
# Implement autosave logic here
|
||||||
|
pass
|
||||||
|
|
||||||
|
return self.results
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in workflow execution: {str(e)}")
|
||||||
|
raise
|
Loading…
Reference in new issue