diff --git a/examples/swarms/rearrange/example.py b/examples/swarms/rearrange/example.py new file mode 100644 index 00000000..930188db --- /dev/null +++ b/examples/swarms/rearrange/example.py @@ -0,0 +1,83 @@ +import os +import asyncio +from dotenv import load_dotenv +load_dotenv() + +from swarms.structs import Agent +from swarms.models import Anthropic +from swarms.structs.rearrange import AgentRearrange + +llm = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"), streaming=True) + +async def sequential(): + + agent1 = Agent( + agent_name="Blog generator", + system_prompt="Generate a blog post like stephen king", + llm=llm, + dashboard=False, + streaming_on=True + ) + + agent2 = Agent( + agent_name="Summarizer", + system_prompt="Summarize the blog post", + llm=llm, + dashboard=False, + streaming_on=True + ) + + flow = f"{agent1.name} -> {agent2.name}" + + agent_rearrange = AgentRearrange( + [agent1, agent2], flow, verbose=False, logging=False + ) + + result = await agent_rearrange.astream( + "Generate a short blog post about Muhammad Ali." + ) + + # LEAVING THIS CALL BELOW FOR COMPARISON with "completion-style" .run() approach ;) + # await agent_rearrange.run( + # "Generate a short blog post about Muhammad Ali." + # ) + +async def parallel(): + + writer1 = Agent( + agent_name="Writer 1", + system_prompt="Generate a blog post in the style of J.K. Rowling about Muhammad Ali", + llm=llm, + dashboard=False, + ) + + writer2 = Agent( + agent_name="Writer 2", + system_prompt="Generate a blog post in the style of Stephen King about Muhammad Ali", + llm=llm, + dashboard=False + ) + + reviewer = Agent( + agent_name="Reviewer", + system_prompt="Select the writer that wrote the best story. There can only be one best story.", + llm=llm, + dashboard=False + ) + + flow = f"{writer1.name}, {writer2.name} -> {reviewer.name}" + + agent_rearrange = AgentRearrange( + [writer1, writer2, reviewer], flow, verbose=False, logging=False + ) + + result = await agent_rearrange.astream("Generate a 1 sentence story about Michael Jordan.") + + # LEAVING THIS CALL BELOW FOR COMPARISON with "completion-style" .run() approach ;) + # result = agent_rearrange.run( + # "Generate a short blog post about Michael Jordan." + # ) + +asyncio.run(sequential()) +# asyncio.run(parallel()) + diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 9173c4fb..7e553a8e 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -713,7 +713,7 @@ class Agent: # Print if self.streaming_on is True: - response = self.stream_response(response) + self.stream_response(response) else: print(response) @@ -882,6 +882,19 @@ class Agent: ) raise error + async def astream_events( + self, task: str = None, img: str = None, *args, **kwargs + ): + """ + Run the Agent with LangChain's astream_events API. + Only works with LangChain-based models. + """ + try: + async for evt in self.llm.astream_events(task, version="v1"): + yield evt + except Exception as e: + print(f"Error streaming events: {e}") + def __call__(self, task: str = None, img: str = None, *args, **kwargs): """Call the agent diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index a0ca3563..22ca45b4 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -3,8 +3,8 @@ from typing import Callable, Dict, List, Optional from swarms.memory.base_vectordb import BaseVectorDatabase from swarms.structs.agent import Agent from swarms.structs.base_swarm import BaseSwarm -from swarms.utils.loguru_logger import logger from swarms.structs.omni_agent_types import AgentType +from swarms.utils.loguru_logger import logger class AgentRearrange(BaseSwarm): @@ -266,6 +266,151 @@ class AgentRearrange(BaseSwarm): logger.error(f"An error occurred: {e}") return e + async def astream( + self, + task: str = None, + img: str = None, + custom_tasks: Dict[str, str] = None, + *args, + **kwargs, + ): + """ + Runs the swarm with LangChain's astream_events v1 API enabled. + NOTICE: Be sure to only call this method if you are using LangChain-based models in your swarm. + This is useful for enhancing user experience by providing real-time updates of how each agent + in the swarm is processing the current task. + + Args: + task: The initial prompt (aka task) passed to the first agent(s) in the swarm. + + Returns: + str: The final output generated. + """ + try: + if not self.validate_flow(): + return "Invalid flow configuration." + + tasks = self.flow.split("->") + current_task = task + + # If custom_tasks have the agents name and tasks then combine them + if custom_tasks is not None: + c_agent_name, c_task = next(iter(custom_tasks.items())) + + # Find the position of the custom agent in the tasks list + position = tasks.index(c_agent_name) + + # If there is a prebois agent merge its task with the custom tasks + if position > 0: + tasks[position - 1] += "->" + c_task + else: + # If there is no prevous agent just insert the custom tasks + tasks.insert(position, c_task) + + logger.info("TASK:", task) + + # Set the loop counter + loop_count = 0 + while loop_count < self.max_loops: + for task in tasks: + agent_names = [ + name.strip() for name in task.split(",") + ] + if len(agent_names) > 1: + # Parallel processing + logger.info( + f"Running agents in parallel: {agent_names}" + ) + results = [] + for agent_name in agent_names: + if agent_name == "H": + # Human in the loop intervention + if ( + self.human_in_the_loop + and self.custom_human_in_the_loop + ): + current_task = ( + self.custom_human_in_the_loop( + current_task + ) + ) + else: + current_task = input( + "Enter your response:" + ) + else: + agent = self.agents[agent_name] + result = None + # As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API + # Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs + # https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference + # Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs + # https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/ + async for evt in agent.astream_events( + current_task, version="v1" + ): + # print(evt) # <- useful when building/debugging + if evt["event"] == "on_llm_end": + result = evt["data"]["output"] + print(agent.name, result) + results.append(result) + + current_task = "" + for index, res in enumerate(results): + current_task += ( + "# OUTPUT of " + + agent_names[index] + + "" + + res + + "\n\n" + ) + else: + # Sequential processing + logger.info( + f"Running agents sequentially: {agent_names}" + ) + + agent_name = agent_names[0] + if agent_name == "H": + # Human-in-the-loop intervention + if ( + self.human_in_the_loop + and self.custom_human_in_the_loop + ): + current_task = ( + self.custom_human_in_the_loop( + current_task + ) + ) + else: + current_task = input( + "Enter the next task: " + ) + else: + agent = self.agents[agent_name] + result = None + # As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API + # Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs + # https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference + # Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs + # https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/ + async for evt in agent.astream_events( + f"SYSTEM: {agent.system_prompt}\nINPUT:{current_task}", + version="v1", + ): + # print(evt) # <- useful when building/debugging + if evt["event"] == "on_llm_end": + result = evt["data"]["output"] + print(agent.name, "result", result) + current_task = result + + loop_count += 1 + + return current_task + except Exception as e: + logger.error(f"An error occurred: {e}") + return e + def process_agent_or_swarm( self, name: str, task: str, img: str, is_last, *args, **kwargs ):