From 92675ef0815c8cd794f60b49d4dd247c250b6b26 Mon Sep 17 00:00:00 2001 From: tad dy Date: Sat, 31 Aug 2024 15:55:24 +0000 Subject: [PATCH 1/4] adding NON-BREAKING/BACKWARDS-COMPATIBLE support for LangChain's v1 astream events to the AgentRearrange & Agent classes PLUS added an example of how to use it --- .../example.py | 83 +++++++++++ swarms/structs/agent.py | 11 +- swarms/structs/rearrange.py | 134 ++++++++++++++++++ 3 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 examples/rearrange_swarm_with_langchain_v1_astream_events/example.py diff --git a/examples/rearrange_swarm_with_langchain_v1_astream_events/example.py b/examples/rearrange_swarm_with_langchain_v1_astream_events/example.py new file mode 100644 index 00000000..5bd7bd95 --- /dev/null +++ b/examples/rearrange_swarm_with_langchain_v1_astream_events/example.py @@ -0,0 +1,83 @@ +import os +import asyncio +from dotenv import load_dotenv +load_dotenv() + +from swarms.structs import Agent +from swarms.models import Anthropic +from swarms.structs.rearrange import AgentRearrange + +llm = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"), streaming=True) + +async def sequential(): + + agent1 = Agent( + agent_name="Blog generator", + system_prompt="Generate a blog post like stephen king", + llm=llm, + dashboard=False, + streaming_on=True + ) + + agent2 = Agent( + agent_name="Summarizer", + system_prompt="Summarize the blog post", + llm=llm, + dashboard=False, + streaming_on=True + ) + + flow = f"{agent1.name} -> {agent2.name}" + + agent_rearrange = AgentRearrange( + [agent1, agent2], flow, verbose=False, logging=False + ) + + result = await agent_rearrange.astream( + "Generate a short blog post about Muhammad Ali." + ) + + # LEAVING THIS CALL BELOW FOR COMPARISON with "completion-style" .run() approach ;) + # await agent_rearrange.run( + # "Generate a short blog post about Muhammad Ali." + # ) + +async def parallel(): + + writer1 = Agent( + agent_name="Writer 1", + system_prompt="Generate a blog post in the style of J.K. Rowling about Muhammad Ali", + llm=llm, + dashboard=False, + ) + + writer2 = Agent( + agent_name="Writer 2", + system_prompt="Generate a blog post in the style of Stephen King about Muhammad Ali", + llm=llm, + dashboard=False + ) + + reviewer = Agent( + agent_name="Reviewer", + system_prompt="Select the writer that wrote the best story. There can only be one best story.", + llm=llm, + dashboard=False + ) + + flow = f"{writer1.name}, {writer2.name} -> {reviewer.name}" + + agent_rearrange = AgentRearrange( + [writer1, writer2, reviewer], flow, verbose=False, logging=False + ) + + result = await agent_rearrange.astream("Generate a 1 sentence story about Michael Jordan.") + + # LEAVING THIS CALL BELOW FOR COMPARISON with "completion-style" .run() approach ;) + # result = agent_rearrange.run( + # "Generate a short blog post about Michael Jordan." + # ) + +# asyncio.run(sequential()) +asyncio.run(parallel()) + diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 9173c4fb..dd8fc0c0 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -713,7 +713,7 @@ class Agent: # Print if self.streaming_on is True: - response = self.stream_response(response) + self.stream_response(response) else: print(response) @@ -882,6 +882,15 @@ class Agent: ) raise error + async def astream_events( + self, task: str = None, img: str = None, *args, **kwargs + ): + try: + async for evt in self.llm.astream_events(task, version="v1"): + yield evt + except Exception as e: + print(f"Error streaming events: {e}") + def __call__(self, task: str = None, img: str = None, *args, **kwargs): """Call the agent diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index a0ca3563..95a24a2b 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -266,6 +266,140 @@ class AgentRearrange(BaseSwarm): logger.error(f"An error occurred: {e}") return e + async def astream( + self, + task: str = None, + img: str = None, + custom_tasks: Dict[str, str] = None, + *args, + **kwargs, + ): + """ + Runs the swarm with LangChain's astream_events v1 API enabled. + NOTICE: Be sure to only call this method if you are using LangChain-based models in your swarm. + This is useful for enhancing user experience by providing real-time updates of how each agent + in the swarm is processing the current task. + + Args: + task: The initial prompt (aka task) passed to the first agent(s) in the swarm. + + Returns: + str: The final output generated. + """ + try: + if not self.validate_flow(): + return "Invalid flow configuration." + + tasks = self.flow.split("->") + current_task = task + + # If custom_tasks have the agents name and tasks then combine them + if custom_tasks is not None: + c_agent_name, c_task = next(iter(custom_tasks.items())) + + # Find the position of the custom agent in the tasks list + position = tasks.index(c_agent_name) + + # If there is a prebois agent merge its task with the custom tasks + if position > 0: + tasks[position - 1] += "->" + c_task + else: + # If there is no prevous agent just insert the custom tasks + tasks.insert(position, c_task) + + logger.info('TASK:', task) + + # Set the loop counter + loop_count = 0 + while loop_count < self.max_loops: + for task in tasks: + agent_names = [ + name.strip() for name in task.split(",") + ] + if len(agent_names) > 1: + # Parallel processing + logger.info( + f"Running agents in parallel: {agent_names}" + ) + results = [] + for agent_name in agent_names: + if agent_name == "H": + # Human in the loop intervention + if ( + self.human_in_the_loop + and self.custom_human_in_the_loop + ): + current_task = ( + self.custom_human_in_the_loop( + current_task + ) + ) + else: + current_task = input( + "Enter your response:" + ) + else: + agent = self.agents[agent_name] + result = None + # As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API + # Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs + # https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference + # Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs + # https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/ + async for evt in agent.astream_events(current_task, version="v1"): + # print(evt) # <- useful when building/debugging + if evt['event'] == "on_llm_end": + result = evt['data']['output'] + print(agent.name, result) + results.append(result) + + current_task = "" + for index,res in enumerate(results): + current_task += "# OUTPUT of " + agent_names[index] + "" + res + "\n\n" + else: + # Sequential processing + logger.info( + f"Running agents sequentially: {agent_names}" + ) + + agent_name = agent_names[0] + if agent_name == "H": + # Human-in-the-loop intervention + if ( + self.human_in_the_loop + and self.custom_human_in_the_loop + ): + current_task = ( + self.custom_human_in_the_loop( + current_task + ) + ) + else: + current_task = input( + "Enter the next task: " + ) + else: + agent = self.agents[agent_name] + result = None + # As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API + # Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs + # https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference + # Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs + # https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/ + async for evt in agent.astream_events(f"SYSTEM: {agent.system_prompt}\nINPUT:{current_task}", version="v1"): + # print(evt) # <- useful when building/debugging + if evt['event'] == "on_llm_end": + result = evt['data']['output'] + print(agent.name, 'result', result) + current_task = result + + loop_count += 1 + + return current_task + except Exception as e: + logger.error(f"An error occurred: {e}") + return e + def process_agent_or_swarm( self, name: str, task: str, img: str, is_last, *args, **kwargs ): From 8a64e1288504cbe92cddb499230a2e064bf5c30c Mon Sep 17 00:00:00 2001 From: tad dy Date: Sat, 31 Aug 2024 16:08:23 +0000 Subject: [PATCH 2/4] Adding documentation comments to the newly added astream_events method on the Agent class --- swarms/structs/agent.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index dd8fc0c0..7e553a8e 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -885,6 +885,10 @@ class Agent: async def astream_events( self, task: str = None, img: str = None, *args, **kwargs ): + """ + Run the Agent with LangChain's astream_events API. + Only works with LangChain-based models. + """ try: async for evt in self.llm.astream_events(task, version="v1"): yield evt From 8c312c2df24cfbd0467185384ee393c71977f1ee Mon Sep 17 00:00:00 2001 From: tad dy Date: Sat, 31 Aug 2024 16:27:59 +0000 Subject: [PATCH 3/4] adding changes suggested by black linter --- swarms/structs/rearrange.py | 39 ++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index 95a24a2b..22ca45b4 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -3,8 +3,8 @@ from typing import Callable, Dict, List, Optional from swarms.memory.base_vectordb import BaseVectorDatabase from swarms.structs.agent import Agent from swarms.structs.base_swarm import BaseSwarm -from swarms.utils.loguru_logger import logger from swarms.structs.omni_agent_types import AgentType +from swarms.utils.loguru_logger import logger class AgentRearrange(BaseSwarm): @@ -307,7 +307,7 @@ class AgentRearrange(BaseSwarm): # If there is no prevous agent just insert the custom tasks tasks.insert(position, c_task) - logger.info('TASK:', task) + logger.info("TASK:", task) # Set the loop counter loop_count = 0 @@ -341,21 +341,29 @@ class AgentRearrange(BaseSwarm): else: agent = self.agents[agent_name] result = None - # As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API + # As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API # Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs # https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference # Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs # https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/ - async for evt in agent.astream_events(current_task, version="v1"): + async for evt in agent.astream_events( + current_task, version="v1" + ): # print(evt) # <- useful when building/debugging - if evt['event'] == "on_llm_end": - result = evt['data']['output'] + if evt["event"] == "on_llm_end": + result = evt["data"]["output"] print(agent.name, result) results.append(result) current_task = "" - for index,res in enumerate(results): - current_task += "# OUTPUT of " + agent_names[index] + "" + res + "\n\n" + for index, res in enumerate(results): + current_task += ( + "# OUTPUT of " + + agent_names[index] + + "" + + res + + "\n\n" + ) else: # Sequential processing logger.info( @@ -381,18 +389,21 @@ class AgentRearrange(BaseSwarm): else: agent = self.agents[agent_name] result = None - # As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API + # As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API # Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs # https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference # Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs # https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/ - async for evt in agent.astream_events(f"SYSTEM: {agent.system_prompt}\nINPUT:{current_task}", version="v1"): + async for evt in agent.astream_events( + f"SYSTEM: {agent.system_prompt}\nINPUT:{current_task}", + version="v1", + ): # print(evt) # <- useful when building/debugging - if evt['event'] == "on_llm_end": - result = evt['data']['output'] - print(agent.name, 'result', result) + if evt["event"] == "on_llm_end": + result = evt["data"]["output"] + print(agent.name, "result", result) current_task = result - + loop_count += 1 return current_task From 10b92ec6c30347186c06508aa443299d38fa5cec Mon Sep 17 00:00:00 2001 From: tad dy Date: Sat, 31 Aug 2024 17:40:05 +0000 Subject: [PATCH 4/4] reorganizing placement of rearrange_swarm example in examples folder --- .../rearrange}/example.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename examples/{rearrange_swarm_with_langchain_v1_astream_events => swarms/rearrange}/example.py (97%) diff --git a/examples/rearrange_swarm_with_langchain_v1_astream_events/example.py b/examples/swarms/rearrange/example.py similarity index 97% rename from examples/rearrange_swarm_with_langchain_v1_astream_events/example.py rename to examples/swarms/rearrange/example.py index 5bd7bd95..930188db 100644 --- a/examples/rearrange_swarm_with_langchain_v1_astream_events/example.py +++ b/examples/swarms/rearrange/example.py @@ -78,6 +78,6 @@ async def parallel(): # "Generate a short blog post about Michael Jordan." # ) -# asyncio.run(sequential()) -asyncio.run(parallel()) +asyncio.run(sequential()) +# asyncio.run(parallel())