pull/1080/head
Aksh Parekh 4 weeks ago
parent 42f0584a55
commit 57a030fb19

@ -38,7 +38,6 @@ class AgentRearrange:
team_awareness: bool = False, team_awareness: bool = False,
time_enabled: bool = False, time_enabled: bool = False,
message_id_on: bool = False, message_id_on: bool = False,
streaming_callback: Optional[Callable[[str], None]] = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -56,7 +55,6 @@ class AgentRearrange:
self.autosave = autosave self.autosave = autosave
self.time_enabled = time_enabled self.time_enabled = time_enabled
self.message_id_on = message_id_on self.message_id_on = message_id_on
self.streaming_callback = streaming_callback
self.conversation = Conversation( self.conversation = Conversation(
name=f"{self.name}-Conversation", name=f"{self.name}-Conversation",
@ -386,9 +384,6 @@ class AgentRearrange:
for agent_name in agent_names: for agent_name in agent_names:
agent = self.agents[agent_name] agent = self.agents[agent_name]
# Set agent.streaming_on if no streaming_callback
if self.streaming_callback is not None:
agent.streaming_on = True
result = agent.run( result = agent.run(
task=self.conversation.get_str(), task=self.conversation.get_str(),
img=img, img=img,
@ -397,11 +392,6 @@ class AgentRearrange:
) )
result = any_to_str(result) result = any_to_str(result)
# Call streaming callback with the result if provided
if self.streaming_callback:
self.streaming_callback(result)
self.conversation.add( self.conversation.add(
agent.agent_name, result agent.agent_name, result
) )
@ -436,9 +426,6 @@ class AgentRearrange:
f"Added sequential awareness for {agent_name}: {awareness_info}" f"Added sequential awareness for {agent_name}: {awareness_info}"
) )
# Set agent.streaming_on if no streaming_callback
if self.streaming_callback is not None:
agent.streaming_on = True
current_task = agent.run( current_task = agent.run(
task=self.conversation.get_str(), task=self.conversation.get_str(),
img=img, img=img,
@ -447,10 +434,6 @@ class AgentRearrange:
) )
current_task = any_to_str(current_task) current_task = any_to_str(current_task)
# Call streaming callback with the result if provided
if self.streaming_callback:
self.streaming_callback(current_task)
self.conversation.add( self.conversation.add(
agent.agent_name, current_task agent.agent_name, current_task
) )
@ -720,4 +703,4 @@ def rearrange(
*args, *args,
**kwargs, **kwargs,
) )
return agent_system.run(task=task, img=img) return agent_system.run(task=task, img=img)

@ -49,7 +49,6 @@ class SequentialWorkflow:
shared_memory_system: callable = None, shared_memory_system: callable = None,
multi_agent_collab_prompt: bool = True, multi_agent_collab_prompt: bool = True,
team_awareness: bool = False, team_awareness: bool = False,
streaming_callback: Optional[Callable[[str], None]] = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -65,8 +64,6 @@ class SequentialWorkflow:
output_type (OutputType, optional): Output format for the workflow. Defaults to "dict". output_type (OutputType, optional): Output format for the workflow. Defaults to "dict".
shared_memory_system (callable, optional): Callable for shared memory management. Defaults to None. shared_memory_system (callable, optional): Callable for shared memory management. Defaults to None.
multi_agent_collab_prompt (bool, optional): If True, appends a collaborative prompt to each agent. multi_agent_collab_prompt (bool, optional): If True, appends a collaborative prompt to each agent.
team_awareness (bool, optional): Whether to enable team awareness. Defaults to False.
streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None.
*args: Additional positional arguments. *args: Additional positional arguments.
**kwargs: Additional keyword arguments. **kwargs: Additional keyword arguments.
@ -82,7 +79,6 @@ class SequentialWorkflow:
self.shared_memory_system = shared_memory_system self.shared_memory_system = shared_memory_system
self.multi_agent_collab_prompt = multi_agent_collab_prompt self.multi_agent_collab_prompt = multi_agent_collab_prompt
self.team_awareness = team_awareness self.team_awareness = team_awareness
self.streaming_callback = streaming_callback
self.reliability_check() self.reliability_check()
self.flow = self.sequential_flow() self.flow = self.sequential_flow()
@ -95,7 +91,6 @@ class SequentialWorkflow:
max_loops=self.max_loops, max_loops=self.max_loops,
output_type=self.output_type, output_type=self.output_type,
team_awareness=self.team_awareness, team_awareness=self.team_awareness,
streaming_callback=self.streaming_callback,
*args, *args,
**kwargs, **kwargs,
) )
@ -188,9 +183,6 @@ class SequentialWorkflow:
return self.agent_rearrange.run( return self.agent_rearrange.run(
task=task, task=task,
img=img, img=img,
streaming_callback=self.streaming_callback,
*args,
**kwargs,
) )
except Exception as e: except Exception as e:
@ -304,4 +296,4 @@ class SequentialWorkflow:
logger.error( logger.error(
f"An error occurred while executing the batch of tasks concurrently: {e}" f"An error occurred while executing the batch of tasks concurrently: {e}"
) )
raise raise

@ -0,0 +1,88 @@
from swarms.structs.agent import Agent
from swarms.structs.sequential_workflow import SequentialWorkflow
def run_workflow_with_streaming_callback(task, streaming_callback):
"""
Run a sequential workflow with two agents and a streaming callback.
Args:
task (str): The task to process through the workflow.
streaming_callback (callable): Function to handle streaming output.
Returns:
The final result from the workflow.
"""
agent1 = Agent(
name="Research Agent",
description="A research agent that can answer questions",
model_name="gpt-4o",
system_prompt=(
"You are a ResearchAgent. Your task is to research and gather "
"information about the given topic. Provide comprehensive research "
"findings and key insights."
),
max_loops=1,
interactive=True,
verbose=True,
)
agent2 = Agent(
name="Analysis Agent",
description="An analysis agent that draws conclusions from research",
model_name="gpt-4o-mini",
system_prompt=(
"You are an AnalysisAgent. Your task is to analyze the research "
"provided by the previous agent and draw meaningful conclusions. "
"Provide detailed analysis and actionable insights."
),
max_loops=1,
interactive=True,
verbose=True,
)
workflow = SequentialWorkflow(
id="research_analysis_workflow",
name="Research Analysis Workflow",
description="A sequential workflow that researches and analyzes topics",
agents=[agent1, agent2],
max_loops=1,
output_type="str",
streaming_callback=streaming_callback,
multi_agent_collab_prompt=True,
)
return workflow.run(task)
if __name__ == "__main__":
# ## REGULAR STREAMING CALLBACK
# def streaming_callback(token):
# print(token, end="", flush=True)
# run_workflow_with_streaming_callback(
# task="What are the latest advancements in AI?",
# streaming_callback=streaming_callback,
# )
# ## CUSTOM BUFFERING STREAMING_CALLBACK BASED ON DEV PREFERED
# buffer = []
# def streaming_callback(token):
# buffer.append(token)
# # Print in bigger chunks (e.g., every 20 tokens or on final flush)
# if len(buffer) >= 20 or token.endswith("\n"):
# print("".join(buffer), end="", flush=True)
# buffer.clear()
# # Optionally, you could add a flush at the end of the run if needed
# run_workflow_with_streaming_callback(
# task="What are the latest advancements in AI?",
# streaming_callback=streaming_callback,
# )
## NO ADDED STREAMING_CALLBACK
run_workflow_with_streaming_callback(
task="What are the latest advancements in AI?",
streaming_callback=None,
)
Loading…
Cancel
Save