diff --git a/examples/rearrange_swarm_with_langchain_v1_astream_events/example.py b/examples/rearrange_swarm_with_langchain_v1_astream_events/example.py new file mode 100644 index 00000000..5bd7bd95 --- /dev/null +++ b/examples/rearrange_swarm_with_langchain_v1_astream_events/example.py @@ -0,0 +1,83 @@ +import os +import asyncio +from dotenv import load_dotenv +load_dotenv() + +from swarms.structs import Agent +from swarms.models import Anthropic +from swarms.structs.rearrange import AgentRearrange + +llm = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"), streaming=True) + +async def sequential(): + + agent1 = Agent( + agent_name="Blog generator", + system_prompt="Generate a blog post like stephen king", + llm=llm, + dashboard=False, + streaming_on=True + ) + + agent2 = Agent( + agent_name="Summarizer", + system_prompt="Summarize the blog post", + llm=llm, + dashboard=False, + streaming_on=True + ) + + flow = f"{agent1.name} -> {agent2.name}" + + agent_rearrange = AgentRearrange( + [agent1, agent2], flow, verbose=False, logging=False + ) + + result = await agent_rearrange.astream( + "Generate a short blog post about Muhammad Ali." + ) + + # LEAVING THIS CALL BELOW FOR COMPARISON with "completion-style" .run() approach ;) + # await agent_rearrange.run( + # "Generate a short blog post about Muhammad Ali." + # ) + +async def parallel(): + + writer1 = Agent( + agent_name="Writer 1", + system_prompt="Generate a blog post in the style of J.K. Rowling about Muhammad Ali", + llm=llm, + dashboard=False, + ) + + writer2 = Agent( + agent_name="Writer 2", + system_prompt="Generate a blog post in the style of Stephen King about Muhammad Ali", + llm=llm, + dashboard=False + ) + + reviewer = Agent( + agent_name="Reviewer", + system_prompt="Select the writer that wrote the best story. There can only be one best story.", + llm=llm, + dashboard=False + ) + + flow = f"{writer1.name}, {writer2.name} -> {reviewer.name}" + + agent_rearrange = AgentRearrange( + [writer1, writer2, reviewer], flow, verbose=False, logging=False + ) + + result = await agent_rearrange.astream("Generate a 1 sentence story about Michael Jordan.") + + # LEAVING THIS CALL BELOW FOR COMPARISON with "completion-style" .run() approach ;) + # result = agent_rearrange.run( + # "Generate a short blog post about Michael Jordan." + # ) + +# asyncio.run(sequential()) +asyncio.run(parallel()) + diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 9173c4fb..dd8fc0c0 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -713,7 +713,7 @@ class Agent: # Print if self.streaming_on is True: - response = self.stream_response(response) + self.stream_response(response) else: print(response) @@ -882,6 +882,15 @@ class Agent: ) raise error + async def astream_events( + self, task: str = None, img: str = None, *args, **kwargs + ): + try: + async for evt in self.llm.astream_events(task, version="v1"): + yield evt + except Exception as e: + print(f"Error streaming events: {e}") + def __call__(self, task: str = None, img: str = None, *args, **kwargs): """Call the agent diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index a0ca3563..95a24a2b 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -266,6 +266,140 @@ class AgentRearrange(BaseSwarm): logger.error(f"An error occurred: {e}") return e + async def astream( + self, + task: str = None, + img: str = None, + custom_tasks: Dict[str, str] = None, + *args, + **kwargs, + ): + """ + Runs the swarm with LangChain's astream_events v1 API enabled. + NOTICE: Be sure to only call this method if you are using LangChain-based models in your swarm. + This is useful for enhancing user experience by providing real-time updates of how each agent + in the swarm is processing the current task. + + Args: + task: The initial prompt (aka task) passed to the first agent(s) in the swarm. + + Returns: + str: The final output generated. + """ + try: + if not self.validate_flow(): + return "Invalid flow configuration." + + tasks = self.flow.split("->") + current_task = task + + # If custom_tasks have the agents name and tasks then combine them + if custom_tasks is not None: + c_agent_name, c_task = next(iter(custom_tasks.items())) + + # Find the position of the custom agent in the tasks list + position = tasks.index(c_agent_name) + + # If there is a prebois agent merge its task with the custom tasks + if position > 0: + tasks[position - 1] += "->" + c_task + else: + # If there is no prevous agent just insert the custom tasks + tasks.insert(position, c_task) + + logger.info('TASK:', task) + + # Set the loop counter + loop_count = 0 + while loop_count < self.max_loops: + for task in tasks: + agent_names = [ + name.strip() for name in task.split(",") + ] + if len(agent_names) > 1: + # Parallel processing + logger.info( + f"Running agents in parallel: {agent_names}" + ) + results = [] + for agent_name in agent_names: + if agent_name == "H": + # Human in the loop intervention + if ( + self.human_in_the_loop + and self.custom_human_in_the_loop + ): + current_task = ( + self.custom_human_in_the_loop( + current_task + ) + ) + else: + current_task = input( + "Enter your response:" + ) + else: + agent = self.agents[agent_name] + result = None + # As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API + # Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs + # https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference + # Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs + # https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/ + async for evt in agent.astream_events(current_task, version="v1"): + # print(evt) # <- useful when building/debugging + if evt['event'] == "on_llm_end": + result = evt['data']['output'] + print(agent.name, result) + results.append(result) + + current_task = "" + for index,res in enumerate(results): + current_task += "# OUTPUT of " + agent_names[index] + "" + res + "\n\n" + else: + # Sequential processing + logger.info( + f"Running agents sequentially: {agent_names}" + ) + + agent_name = agent_names[0] + if agent_name == "H": + # Human-in-the-loop intervention + if ( + self.human_in_the_loop + and self.custom_human_in_the_loop + ): + current_task = ( + self.custom_human_in_the_loop( + current_task + ) + ) + else: + current_task = input( + "Enter the next task: " + ) + else: + agent = self.agents[agent_name] + result = None + # As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API + # Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs + # https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference + # Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs + # https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/ + async for evt in agent.astream_events(f"SYSTEM: {agent.system_prompt}\nINPUT:{current_task}", version="v1"): + # print(evt) # <- useful when building/debugging + if evt['event'] == "on_llm_end": + result = evt['data']['output'] + print(agent.name, 'result', result) + current_task = result + + loop_count += 1 + + return current_task + except Exception as e: + logger.error(f"An error occurred: {e}") + return e + def process_agent_or_swarm( self, name: str, task: str, img: str, is_last, *args, **kwargs ):