fix streaming in ConcurrentWorkflow !

pull/978/head
harshalmore31 3 days ago
parent 27a57c1f09
commit 69aced2bf8

@ -996,6 +996,7 @@ class Agent:
self, self,
task: Optional[Union[str, Any]] = None, task: Optional[Union[str, Any]] = None,
img: Optional[str] = None, img: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
*args, *args,
**kwargs, **kwargs,
) -> Any: ) -> Any:
@ -1077,6 +1078,7 @@ class Agent:
task=task_prompt, task=task_prompt,
img=img, img=img,
current_loop=loop_count, current_loop=loop_count,
streaming_callback=streaming_callback,
*args, *args,
**kwargs, **kwargs,
) )
@ -1084,6 +1086,7 @@ class Agent:
response = self.call_llm( response = self.call_llm(
task=task_prompt, task=task_prompt,
current_loop=loop_count, current_loop=loop_count,
streaming_callback=streaming_callback,
*args, *args,
**kwargs, **kwargs,
) )
@ -2470,6 +2473,7 @@ class Agent:
task: str, task: str,
img: Optional[str] = None, img: Optional[str] = None,
current_loop: int = 0, current_loop: int = 0,
streaming_callback: Optional[Callable[[str], None]] = None,
*args, *args,
**kwargs, **kwargs,
) -> str: ) -> str:
@ -2480,6 +2484,7 @@ class Agent:
task (str): The task to be performed by the `llm` object. task (str): The task to be performed by the `llm` object.
img (str, optional): Path or URL to an image file. img (str, optional): Path or URL to an image file.
audio (str, optional): Path or URL to an audio file. audio (str, optional): Path or URL to an audio file.
streaming_callback (Optional[Callable[[str], None]]): Callback function to receive streaming tokens in real-time.
*args: Variable length argument list. *args: Variable length argument list.
**kwargs: Arbitrary keyword arguments. **kwargs: Arbitrary keyword arguments.
@ -2515,8 +2520,24 @@ class Agent:
if hasattr( if hasattr(
streaming_response, "__iter__" streaming_response, "__iter__"
) and not isinstance(streaming_response, str): ) and not isinstance(streaming_response, str):
# Check if streaming_callback is provided (for ConcurrentWorkflow dashboard integration)
if streaming_callback is not None:
# Real-time callback streaming for dashboard integration
chunks = []
for chunk in streaming_response:
if (
hasattr(chunk, "choices")
and chunk.choices[0].delta.content
):
content = chunk.choices[
0
].delta.content
chunks.append(content)
# Call the streaming callback with the new chunk
streaming_callback(content)
complete_response = "".join(chunks)
# Check print_on parameter for different streaming behaviors # Check print_on parameter for different streaming behaviors
if self.print_on is False: elif self.print_on is False:
# Silent streaming - no printing, just collect chunks # Silent streaming - no printing, just collect chunks
chunks = [] chunks = []
for chunk in streaming_response: for chunk in streaming_response:
@ -2599,6 +2620,7 @@ class Agent:
img: Optional[str] = None, img: Optional[str] = None,
imgs: Optional[List[str]] = None, imgs: Optional[List[str]] = None,
correct_answer: Optional[str] = None, correct_answer: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
*args, *args,
**kwargs, **kwargs,
) -> Any: ) -> Any:
@ -2613,6 +2635,7 @@ class Agent:
task (Optional[str], optional): The task to be executed. Defaults to None. task (Optional[str], optional): The task to be executed. Defaults to None.
img (Optional[str], optional): The image to be processed. Defaults to None. img (Optional[str], optional): The image to be processed. Defaults to None.
imgs (Optional[List[str]], optional): The list of images to be processed. Defaults to None. imgs (Optional[List[str]], optional): The list of images to be processed. Defaults to None.
streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None.
*args: Additional positional arguments to be passed to the execution method. *args: Additional positional arguments to be passed to the execution method.
**kwargs: Additional keyword arguments to be passed to the execution method. **kwargs: Additional keyword arguments to be passed to the execution method.
@ -2644,6 +2667,7 @@ class Agent:
output = self._run( output = self._run(
task=task, task=task,
img=img, img=img,
streaming_callback=streaming_callback,
*args, *args,
**kwargs, **kwargs,
) )

@ -1,5 +1,6 @@
import concurrent.futures import concurrent.futures
import os import os
import time
from typing import Callable, List, Optional, Union from typing import Callable, List, Optional, Union
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
@ -450,8 +451,25 @@ class ConcurrentWorkflow(BaseSwarm):
if self.show_dashboard: if self.show_dashboard:
self.display_agent_dashboard() self.display_agent_dashboard()
# Run the agent # Create a streaming callback for this agent with throttling
output = agent.run(task=task, img=img, imgs=imgs) last_update_time = [0] # Use list to allow modification in nested function
update_interval = 0.1 # Update dashboard every 100ms for smooth streaming
def streaming_callback(chunk: str):
"""Update dashboard with streaming content"""
if self.show_dashboard:
# Append the chunk to the agent's current output
current_output = self.agent_statuses[agent.agent_name]["output"]
self.agent_statuses[agent.agent_name]["output"] = current_output + chunk
# Throttle dashboard updates for better performance
current_time = time.time()
if current_time - last_update_time[0] >= update_interval:
self.display_agent_dashboard()
last_update_time[0] = current_time
# Run the agent with streaming callback
output = agent.run(task=task, img=img, imgs=imgs, streaming_callback=streaming_callback)
# Update status to completed # Update status to completed
self.agent_statuses[agent.agent_name][ self.agent_statuses[agent.agent_name][

Loading…
Cancel
Save