adding NON-BREAKING/BACKWARDS-COMPATIBLE support for LangChain's v1 astream events to the AgentRearrange & Agent classes PLUS added an example of how to use it

pull/576/head
tad dy 4 months ago
parent 0873d3e8d0
commit 92675ef081

@ -0,0 +1,83 @@
import os
import asyncio
from dotenv import load_dotenv
load_dotenv()
from swarms.structs import Agent
from swarms.models import Anthropic
from swarms.structs.rearrange import AgentRearrange
llm = Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY"), streaming=True)
async def sequential():
agent1 = Agent(
agent_name="Blog generator",
system_prompt="Generate a blog post like stephen king",
llm=llm,
dashboard=False,
streaming_on=True
)
agent2 = Agent(
agent_name="Summarizer",
system_prompt="Summarize the blog post",
llm=llm,
dashboard=False,
streaming_on=True
)
flow = f"{agent1.name} -> {agent2.name}"
agent_rearrange = AgentRearrange(
[agent1, agent2], flow, verbose=False, logging=False
)
result = await agent_rearrange.astream(
"Generate a short blog post about Muhammad Ali."
)
# LEAVING THIS CALL BELOW FOR COMPARISON with "completion-style" .run() approach ;)
# await agent_rearrange.run(
# "Generate a short blog post about Muhammad Ali."
# )
async def parallel():
writer1 = Agent(
agent_name="Writer 1",
system_prompt="Generate a blog post in the style of J.K. Rowling about Muhammad Ali",
llm=llm,
dashboard=False,
)
writer2 = Agent(
agent_name="Writer 2",
system_prompt="Generate a blog post in the style of Stephen King about Muhammad Ali",
llm=llm,
dashboard=False
)
reviewer = Agent(
agent_name="Reviewer",
system_prompt="Select the writer that wrote the best story. There can only be one best story.",
llm=llm,
dashboard=False
)
flow = f"{writer1.name}, {writer2.name} -> {reviewer.name}"
agent_rearrange = AgentRearrange(
[writer1, writer2, reviewer], flow, verbose=False, logging=False
)
result = await agent_rearrange.astream("Generate a 1 sentence story about Michael Jordan.")
# LEAVING THIS CALL BELOW FOR COMPARISON with "completion-style" .run() approach ;)
# result = agent_rearrange.run(
# "Generate a short blog post about Michael Jordan."
# )
# asyncio.run(sequential())
asyncio.run(parallel())

@ -713,7 +713,7 @@ class Agent:
# Print # Print
if self.streaming_on is True: if self.streaming_on is True:
response = self.stream_response(response) self.stream_response(response)
else: else:
print(response) print(response)
@ -882,6 +882,15 @@ class Agent:
) )
raise error raise error
async def astream_events(
self, task: str = None, img: str = None, *args, **kwargs
):
try:
async for evt in self.llm.astream_events(task, version="v1"):
yield evt
except Exception as e:
print(f"Error streaming events: {e}")
def __call__(self, task: str = None, img: str = None, *args, **kwargs): def __call__(self, task: str = None, img: str = None, *args, **kwargs):
"""Call the agent """Call the agent

@ -266,6 +266,140 @@ class AgentRearrange(BaseSwarm):
logger.error(f"An error occurred: {e}") logger.error(f"An error occurred: {e}")
return e return e
async def astream(
self,
task: str = None,
img: str = None,
custom_tasks: Dict[str, str] = None,
*args,
**kwargs,
):
"""
Runs the swarm with LangChain's astream_events v1 API enabled.
NOTICE: Be sure to only call this method if you are using LangChain-based models in your swarm.
This is useful for enhancing user experience by providing real-time updates of how each agent
in the swarm is processing the current task.
Args:
task: The initial prompt (aka task) passed to the first agent(s) in the swarm.
Returns:
str: The final output generated.
"""
try:
if not self.validate_flow():
return "Invalid flow configuration."
tasks = self.flow.split("->")
current_task = task
# If custom_tasks have the agents name and tasks then combine them
if custom_tasks is not None:
c_agent_name, c_task = next(iter(custom_tasks.items()))
# Find the position of the custom agent in the tasks list
position = tasks.index(c_agent_name)
# If there is a prebois agent merge its task with the custom tasks
if position > 0:
tasks[position - 1] += "->" + c_task
else:
# If there is no prevous agent just insert the custom tasks
tasks.insert(position, c_task)
logger.info('TASK:', task)
# Set the loop counter
loop_count = 0
while loop_count < self.max_loops:
for task in tasks:
agent_names = [
name.strip() for name in task.split(",")
]
if len(agent_names) > 1:
# Parallel processing
logger.info(
f"Running agents in parallel: {agent_names}"
)
results = []
for agent_name in agent_names:
if agent_name == "H":
# Human in the loop intervention
if (
self.human_in_the_loop
and self.custom_human_in_the_loop
):
current_task = (
self.custom_human_in_the_loop(
current_task
)
)
else:
current_task = input(
"Enter your response:"
)
else:
agent = self.agents[agent_name]
result = None
# As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API
# Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs
# https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference
# Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs
# https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/
async for evt in agent.astream_events(current_task, version="v1"):
# print(evt) # <- useful when building/debugging
if evt['event'] == "on_llm_end":
result = evt['data']['output']
print(agent.name, result)
results.append(result)
current_task = ""
for index,res in enumerate(results):
current_task += "# OUTPUT of " + agent_names[index] + "" + res + "\n\n"
else:
# Sequential processing
logger.info(
f"Running agents sequentially: {agent_names}"
)
agent_name = agent_names[0]
if agent_name == "H":
# Human-in-the-loop intervention
if (
self.human_in_the_loop
and self.custom_human_in_the_loop
):
current_task = (
self.custom_human_in_the_loop(
current_task
)
)
else:
current_task = input(
"Enter the next task: "
)
else:
agent = self.agents[agent_name]
result = None
# As the current `swarms` package is using LangChain v0.1 we need to use the v0.1 version of the `astream_events` API
# Below is the link to the `astream_events` spec as outlined in the LangChain v0.1 docs
# https://python.langchain.com/v0.1/docs/expression_language/streaming/#event-reference
# Below is the link to the `astream_events` spec as outlined in the LangChain v0.2 docs
# https://python.langchain.com/v0.2/docs/versions/v0_2/migrating_astream_events/
async for evt in agent.astream_events(f"SYSTEM: {agent.system_prompt}\nINPUT:{current_task}", version="v1"):
# print(evt) # <- useful when building/debugging
if evt['event'] == "on_llm_end":
result = evt['data']['output']
print(agent.name, 'result', result)
current_task = result
loop_count += 1
return current_task
except Exception as e:
logger.error(f"An error occurred: {e}")
return e
def process_agent_or_swarm( def process_agent_or_swarm(
self, name: str, task: str, img: str, is_last, *args, **kwargs self, name: str, task: str, img: str, is_last, *args, **kwargs
): ):

Loading…
Cancel
Save