diff --git a/examples/multi_agent/concurrent_examples/streaming_concurrent_workflow.py b/examples/multi_agent/concurrent_examples/streaming_concurrent_workflow.py new file mode 100644 index 00000000..c8dc9366 --- /dev/null +++ b/examples/multi_agent/concurrent_examples/streaming_concurrent_workflow.py @@ -0,0 +1,62 @@ +from swarms import Agent, ConcurrentWorkflow, SwarmRouter + +# Initialize market research agent +market_researcher = Agent( + agent_name="Market-Researcher", + system_prompt="""You are a market research specialist. Your tasks include: + 1. Analyzing market trends and patterns + 2. Identifying market opportunities and threats + 3. Evaluating competitor strategies + 4. Assessing customer needs and preferences + 5. Providing actionable market insights""", + model_name="claude-3-5-sonnet-20240620", + max_loops=1, + streaming_on=True, + print_on=False, +) + +# Initialize financial analyst agent +financial_analyst = Agent( + agent_name="Financial-Analyst", + system_prompt="""You are a financial analysis expert. Your responsibilities include: + 1. Analyzing financial statements + 2. Evaluating investment opportunities + 3. Assessing risk factors + 4. Providing financial forecasts + 5. Recommending financial strategies""", + model_name="claude-3-5-sonnet-20240620", + max_loops=1, + streaming_on=True, + print_on=False, +) + +# Initialize technical analyst agent +technical_analyst = Agent( + agent_name="Technical-Analyst", + system_prompt="""You are a technical analysis specialist. Your focus areas include: + 1. Analyzing price patterns and trends + 2. Evaluating technical indicators + 3. Identifying support and resistance levels + 4. Assessing market momentum + 5. Providing trading recommendations""", + model_name="claude-3-5-sonnet-20240620", + max_loops=1, + streaming_on=True, + print_on=False, +) + +# Create list of agents +agents = [market_researcher, financial_analyst, technical_analyst] + +# Initialize the concurrent workflow +workflow = ConcurrentWorkflow( + name="market-analysis-workflow", + agents=agents, + max_loops=1, + show_dashboard=True, +) + +# Run the workflow +result = workflow.run( + "Analyze Tesla (TSLA) stock from market, financial, and technical perspectives" +) \ No newline at end of file diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 8ed7707a..ce57c65f 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -996,6 +996,7 @@ class Agent: self, task: Optional[Union[str, Any]] = None, img: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ) -> Any: @@ -1077,6 +1078,7 @@ class Agent: task=task_prompt, img=img, current_loop=loop_count, + streaming_callback=streaming_callback, *args, **kwargs, ) @@ -1084,6 +1086,7 @@ class Agent: response = self.call_llm( task=task_prompt, current_loop=loop_count, + streaming_callback=streaming_callback, *args, **kwargs, ) @@ -2470,6 +2473,7 @@ class Agent: task: str, img: Optional[str] = None, current_loop: int = 0, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ) -> str: @@ -2480,6 +2484,7 @@ class Agent: task (str): The task to be performed by the `llm` object. img (str, optional): Path or URL to an image file. audio (str, optional): Path or URL to an audio file. + streaming_callback (Optional[Callable[[str], None]]): Callback function to receive streaming tokens in real-time. *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. @@ -2515,8 +2520,24 @@ class Agent: if hasattr( streaming_response, "__iter__" ) and not isinstance(streaming_response, str): + # Check if streaming_callback is provided (for ConcurrentWorkflow dashboard integration) + if streaming_callback is not None: + # Real-time callback streaming for dashboard integration + chunks = [] + for chunk in streaming_response: + if ( + hasattr(chunk, "choices") + and chunk.choices[0].delta.content + ): + content = chunk.choices[ + 0 + ].delta.content + chunks.append(content) + # Call the streaming callback with the new chunk + streaming_callback(content) + complete_response = "".join(chunks) # Check print_on parameter for different streaming behaviors - if self.print_on is False: + elif self.print_on is False: # Silent streaming - no printing, just collect chunks chunks = [] for chunk in streaming_response: @@ -2599,6 +2620,7 @@ class Agent: img: Optional[str] = None, imgs: Optional[List[str]] = None, correct_answer: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ) -> Any: @@ -2613,6 +2635,7 @@ class Agent: task (Optional[str], optional): The task to be executed. Defaults to None. img (Optional[str], optional): The image to be processed. Defaults to None. imgs (Optional[List[str]], optional): The list of images to be processed. Defaults to None. + streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None. *args: Additional positional arguments to be passed to the execution method. **kwargs: Additional keyword arguments to be passed to the execution method. @@ -2644,6 +2667,7 @@ class Agent: output = self._run( task=task, img=img, + streaming_callback=streaming_callback, *args, **kwargs, ) diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index 7f0c0d65..e045441d 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -1,5 +1,6 @@ import concurrent.futures import os +import time from typing import Callable, List, Optional, Union from swarms.structs.agent import Agent @@ -450,8 +451,25 @@ class ConcurrentWorkflow(BaseSwarm): if self.show_dashboard: self.display_agent_dashboard() - # Run the agent - output = agent.run(task=task, img=img, imgs=imgs) + # Create a streaming callback for this agent with throttling + last_update_time = [0] # Use list to allow modification in nested function + update_interval = 0.1 # Update dashboard every 100ms for smooth streaming + + def streaming_callback(chunk: str): + """Update dashboard with streaming content""" + if self.show_dashboard: + # Append the chunk to the agent's current output + current_output = self.agent_statuses[agent.agent_name]["output"] + self.agent_statuses[agent.agent_name]["output"] = current_output + chunk + + # Throttle dashboard updates for better performance + current_time = time.time() + if current_time - last_update_time[0] >= update_interval: + self.display_agent_dashboard() + last_update_time[0] = current_time + + # Run the agent with streaming callback + output = agent.run(task=task, img=img, imgs=imgs, streaming_callback=streaming_callback) # Update status to completed self.agent_statuses[agent.agent_name][