|
|
|
@ -2,9 +2,12 @@ import asyncio
|
|
|
|
|
import concurrent.futures
|
|
|
|
|
import time
|
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
from typing import Any, Dict, List, Optional, Callable
|
|
|
|
|
from swarms.structs.agent import Agent
|
|
|
|
|
from swarms.agents.base import AbstractWorker
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
|
import logging
|
|
|
|
|
from termcolor import colored
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AbstractSwarm(ABC):
|
|
|
|
@ -360,5 +363,83 @@ class AbstractSwarm(ABC):
|
|
|
|
|
# Assign task to agent by their agent id
|
|
|
|
|
agent = self.select_agent(agent_id)
|
|
|
|
|
return agent.run(task, *args, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def task_assignment_by_name(
|
|
|
|
|
self, task: str, agent_name: str, *args, **kwargs
|
|
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
Assign a task to an agent
|
|
|
|
|
"""
|
|
|
|
|
# Assign task to agent by their agent id
|
|
|
|
|
agent = self.select_agent_by_name(agent_name)
|
|
|
|
|
return agent.run(task, *args, **kwargs)
|
|
|
|
|
|
|
|
|
|
def concurrent_run(self, task: str) -> List[str]:
|
|
|
|
|
"""Synchronously run the task on all llms and collect responses"""
|
|
|
|
|
with ThreadPoolExecutor() as executor:
|
|
|
|
|
future_to_llm = {
|
|
|
|
|
executor.submit(agent, task): agent
|
|
|
|
|
for agent in self.agents
|
|
|
|
|
}
|
|
|
|
|
responses = []
|
|
|
|
|
for future in as_completed(future_to_llm):
|
|
|
|
|
try:
|
|
|
|
|
responses.append(future.result())
|
|
|
|
|
except Exception as error:
|
|
|
|
|
print(
|
|
|
|
|
f"{future_to_llm[future]} generated an"
|
|
|
|
|
f" exception: {error}"
|
|
|
|
|
)
|
|
|
|
|
self.last_responses = responses
|
|
|
|
|
self.task_history.append(task)
|
|
|
|
|
return responses
|
|
|
|
|
|
|
|
|
|
def add_llm(self, agent: Callable):
|
|
|
|
|
"""Add an llm to the god mode"""
|
|
|
|
|
self.agents.append(agent)
|
|
|
|
|
|
|
|
|
|
def remove_llm(self, agent: Callable):
|
|
|
|
|
"""Remove an llm from the god mode"""
|
|
|
|
|
self.agents.remove(agent)
|
|
|
|
|
|
|
|
|
|
def add_agent(self, agent: Agent = None, *args, **kwargs):
|
|
|
|
|
"""Add an agent to the swarm
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
agent (Agent, optional): _description_. Defaults to None.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
_type_: _description_
|
|
|
|
|
"""
|
|
|
|
|
self.agents.append(agent)
|
|
|
|
|
return agent
|
|
|
|
|
|
|
|
|
|
def run_all(self, task: str = None, *args, **kwargs):
|
|
|
|
|
"""Run all agents
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
task (str, optional): _description_. Defaults to None.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
_type_: _description_
|
|
|
|
|
"""
|
|
|
|
|
responses = []
|
|
|
|
|
for agent in self.agents:
|
|
|
|
|
responses.append(agent(task, *args, **kwargs))
|
|
|
|
|
return responses
|
|
|
|
|
|
|
|
|
|
def run_on_all_agents(self, task: str = None, *args, **kwargs):
|
|
|
|
|
"""Run on all agents
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
task (str, optional): _description_. Defaults to None.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
_type_: _description_
|
|
|
|
|
"""
|
|
|
|
|
with ThreadPoolExecutor() as executor:
|
|
|
|
|
responses = executor.map(
|
|
|
|
|
lambda agent: agent(task, *args, **kwargs),
|
|
|
|
|
self.agents,
|
|
|
|
|
)
|
|
|
|
|
return list(responses)
|
|
|
|
|