diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 8ed7707a..ce57c65f 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -996,6 +996,7 @@ class Agent: self, task: Optional[Union[str, Any]] = None, img: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ) -> Any: @@ -1077,6 +1078,7 @@ class Agent: task=task_prompt, img=img, current_loop=loop_count, + streaming_callback=streaming_callback, *args, **kwargs, ) @@ -1084,6 +1086,7 @@ class Agent: response = self.call_llm( task=task_prompt, current_loop=loop_count, + streaming_callback=streaming_callback, *args, **kwargs, ) @@ -2470,6 +2473,7 @@ class Agent: task: str, img: Optional[str] = None, current_loop: int = 0, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ) -> str: @@ -2480,6 +2484,7 @@ class Agent: task (str): The task to be performed by the `llm` object. img (str, optional): Path or URL to an image file. audio (str, optional): Path or URL to an audio file. + streaming_callback (Optional[Callable[[str], None]]): Callback function to receive streaming tokens in real-time. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. @@ -2515,8 +2520,24 @@ class Agent: if hasattr( streaming_response, "__iter__" ) and not isinstance(streaming_response, str): + # Check if streaming_callback is provided (for ConcurrentWorkflow dashboard integration) + if streaming_callback is not None: + # Real-time callback streaming for dashboard integration + chunks = [] + for chunk in streaming_response: + if ( + hasattr(chunk, "choices") + and chunk.choices[0].delta.content + ): + content = chunk.choices[ + 0 + ].delta.content + chunks.append(content) + # Call the streaming callback with the new chunk + streaming_callback(content) + complete_response = "".join(chunks) # Check print_on parameter for different streaming behaviors - if self.print_on is False: + elif self.print_on is False: # Silent streaming - no printing, just collect chunks chunks = [] for chunk in streaming_response: @@ -2599,6 +2620,7 @@ class Agent: img: Optional[str] = None, imgs: Optional[List[str]] = None, correct_answer: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ) -> Any: @@ -2613,6 +2635,7 @@ class Agent: task (Optional[str], optional): The task to be executed. Defaults to None. img (Optional[str], optional): The image to be processed. Defaults to None. imgs (Optional[List[str]], optional): The list of images to be processed. Defaults to None. + streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None. *args: Additional positional arguments to be passed to the execution method. **kwargs: Additional keyword arguments to be passed to the execution method. @@ -2644,6 +2667,7 @@ class Agent: output = self._run( task=task, img=img, + streaming_callback=streaming_callback, *args, **kwargs, ) diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index b60399a2..57ac2c97 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -1,5 +1,6 @@ import concurrent.futures import os +import time from typing import Callable, List, Optional, Union from swarms.structs.agent import Agent @@ -450,8 +451,25 @@ class ConcurrentWorkflow(BaseSwarm): if self.show_dashboard: self.display_agent_dashboard() - # Run the agent - output = agent.run(task=task, img=img, imgs=imgs) + # Create a streaming callback for this agent with throttling + last_update_time = [0] # Use list to allow modification in nested function + update_interval = 0.1 # Update dashboard every 100ms for smooth streaming + + def streaming_callback(chunk: str): + """Update dashboard with streaming content""" + if self.show_dashboard: + # Append the chunk to the agent's current output + current_output = self.agent_statuses[agent.agent_name]["output"] + self.agent_statuses[agent.agent_name]["output"] = current_output + chunk + + # Throttle dashboard updates for better performance + current_time = time.time() + if current_time - last_update_time[0] >= update_interval: + self.display_agent_dashboard() + last_update_time[0] = current_time + + # Run the agent with streaming callback + output = agent.run(task=task, img=img, imgs=imgs, streaming_callback=streaming_callback) # Update status to completed self.agent_statuses[agent.agent_name][