added streaming callback functionality to sequential workflow

pull/1080/head
Aksh Parekh 4 weeks ago
parent 449fe046e0
commit bb7fd5b79f

@ -38,6 +38,7 @@ class AgentRearrange:
team_awareness: bool = False, team_awareness: bool = False,
time_enabled: bool = False, time_enabled: bool = False,
message_id_on: bool = False, message_id_on: bool = False,
streaming_callback: Optional[Callable[[str], None]] = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -55,6 +56,7 @@ class AgentRearrange:
self.autosave = autosave self.autosave = autosave
self.time_enabled = time_enabled self.time_enabled = time_enabled
self.message_id_on = message_id_on self.message_id_on = message_id_on
self.streaming_callback = streaming_callback
self.conversation = Conversation( self.conversation = Conversation(
name=f"{self.name}-Conversation", name=f"{self.name}-Conversation",
@ -384,6 +386,9 @@ class AgentRearrange:
for agent_name in agent_names: for agent_name in agent_names:
agent = self.agents[agent_name] agent = self.agents[agent_name]
# Set agent.streaming_on if no streaming_callback
if self.streaming_callback is None:
agent.streaming_on = True
result = agent.run( result = agent.run(
task=self.conversation.get_str(), task=self.conversation.get_str(),
img=img, img=img,
@ -392,6 +397,11 @@ class AgentRearrange:
) )
result = any_to_str(result) result = any_to_str(result)
# Call streaming callback with the result if provided
if self.streaming_callback:
self.streaming_callback(result)
self.conversation.add( self.conversation.add(
agent.agent_name, result agent.agent_name, result
) )
@ -426,6 +436,9 @@ class AgentRearrange:
f"Added sequential awareness for {agent_name}: {awareness_info}" f"Added sequential awareness for {agent_name}: {awareness_info}"
) )
# Set agent.streaming_on if no streaming_callback
if self.streaming_callback is None:
agent.streaming_on = True
current_task = agent.run( current_task = agent.run(
task=self.conversation.get_str(), task=self.conversation.get_str(),
img=img, img=img,
@ -434,6 +447,10 @@ class AgentRearrange:
) )
current_task = any_to_str(current_task) current_task = any_to_str(current_task)
# Call streaming callback with the result if provided
if self.streaming_callback:
self.streaming_callback(current_task)
self.conversation.add( self.conversation.add(
agent.agent_name, current_task agent.agent_name, current_task
) )

@ -49,6 +49,7 @@ class SequentialWorkflow:
shared_memory_system: callable = None, shared_memory_system: callable = None,
multi_agent_collab_prompt: bool = True, multi_agent_collab_prompt: bool = True,
team_awareness: bool = False, team_awareness: bool = False,
streaming_callback: Optional[Callable[[str], None]] = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -64,6 +65,8 @@ class SequentialWorkflow:
output_type (OutputType, optional): Output format for the workflow. Defaults to "dict". output_type (OutputType, optional): Output format for the workflow. Defaults to "dict".
shared_memory_system (callable, optional): Callable for shared memory management. Defaults to None. shared_memory_system (callable, optional): Callable for shared memory management. Defaults to None.
multi_agent_collab_prompt (bool, optional): If True, appends a collaborative prompt to each agent. multi_agent_collab_prompt (bool, optional): If True, appends a collaborative prompt to each agent.
team_awareness (bool, optional): Whether to enable team awareness. Defaults to False.
streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None.
*args: Additional positional arguments. *args: Additional positional arguments.
**kwargs: Additional keyword arguments. **kwargs: Additional keyword arguments.
@ -79,6 +82,7 @@ class SequentialWorkflow:
self.shared_memory_system = shared_memory_system self.shared_memory_system = shared_memory_system
self.multi_agent_collab_prompt = multi_agent_collab_prompt self.multi_agent_collab_prompt = multi_agent_collab_prompt
self.team_awareness = team_awareness self.team_awareness = team_awareness
self.streaming_callback = streaming_callback
self.reliability_check() self.reliability_check()
self.flow = self.sequential_flow() self.flow = self.sequential_flow()
@ -183,6 +187,9 @@ class SequentialWorkflow:
return self.agent_rearrange.run( return self.agent_rearrange.run(
task=task, task=task,
img=img, img=img,
streaming_callback=self.streaming_callback,
*args,
**kwargs,
) )
except Exception as e: except Exception as e:

Loading…
Cancel
Save