Merge pull request #978 from harshalmore31/Fix/concurrent-streaming

fix streaming in ConcurrentWorkflow !
pull/983/head^2
Kye Gomez 2 days ago committed by GitHub
commit b63996fc2d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,62 @@
from swarms import Agent, ConcurrentWorkflow, SwarmRouter
# Initialize market research agent
market_researcher = Agent(
agent_name="Market-Researcher",
system_prompt="""You are a market research specialist. Your tasks include:
1. Analyzing market trends and patterns
2. Identifying market opportunities and threats
3. Evaluating competitor strategies
4. Assessing customer needs and preferences
5. Providing actionable market insights""",
model_name="claude-3-5-sonnet-20240620",
max_loops=1,
streaming_on=True,
print_on=False,
)
# Initialize financial analyst agent
financial_analyst = Agent(
agent_name="Financial-Analyst",
system_prompt="""You are a financial analysis expert. Your responsibilities include:
1. Analyzing financial statements
2. Evaluating investment opportunities
3. Assessing risk factors
4. Providing financial forecasts
5. Recommending financial strategies""",
model_name="claude-3-5-sonnet-20240620",
max_loops=1,
streaming_on=True,
print_on=False,
)
# Initialize technical analyst agent
technical_analyst = Agent(
agent_name="Technical-Analyst",
system_prompt="""You are a technical analysis specialist. Your focus areas include:
1. Analyzing price patterns and trends
2. Evaluating technical indicators
3. Identifying support and resistance levels
4. Assessing market momentum
5. Providing trading recommendations""",
model_name="claude-3-5-sonnet-20240620",
max_loops=1,
streaming_on=True,
print_on=False,
)
# Create list of agents
agents = [market_researcher, financial_analyst, technical_analyst]
# Initialize the concurrent workflow
workflow = ConcurrentWorkflow(
name="market-analysis-workflow",
agents=agents,
max_loops=1,
show_dashboard=True,
)
# Run the workflow
result = workflow.run(
"Analyze Tesla (TSLA) stock from market, financial, and technical perspectives"
)

@ -996,6 +996,7 @@ class Agent:
self, self,
task: Optional[Union[str, Any]] = None, task: Optional[Union[str, Any]] = None,
img: Optional[str] = None, img: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
*args, *args,
**kwargs, **kwargs,
) -> Any: ) -> Any:
@ -1077,6 +1078,7 @@ class Agent:
task=task_prompt, task=task_prompt,
img=img, img=img,
current_loop=loop_count, current_loop=loop_count,
streaming_callback=streaming_callback,
*args, *args,
**kwargs, **kwargs,
) )
@ -1084,6 +1086,7 @@ class Agent:
response = self.call_llm( response = self.call_llm(
task=task_prompt, task=task_prompt,
current_loop=loop_count, current_loop=loop_count,
streaming_callback=streaming_callback,
*args, *args,
**kwargs, **kwargs,
) )
@ -2470,6 +2473,7 @@ class Agent:
task: str, task: str,
img: Optional[str] = None, img: Optional[str] = None,
current_loop: int = 0, current_loop: int = 0,
streaming_callback: Optional[Callable[[str], None]] = None,
*args, *args,
**kwargs, **kwargs,
) -> str: ) -> str:
@ -2480,6 +2484,7 @@ class Agent:
task (str): The task to be performed by the `llm` object. task (str): The task to be performed by the `llm` object.
img (str, optional): Path or URL to an image file. img (str, optional): Path or URL to an image file.
audio (str, optional): Path or URL to an audio file. audio (str, optional): Path or URL to an audio file.
streaming_callback (Optional[Callable[[str], None]]): Callback function to receive streaming tokens in real-time.
*args: Variable length argument list. *args: Variable length argument list.
**kwargs: Arbitrary keyword arguments. **kwargs: Arbitrary keyword arguments.
@ -2515,8 +2520,24 @@ class Agent:
if hasattr( if hasattr(
streaming_response, "__iter__" streaming_response, "__iter__"
) and not isinstance(streaming_response, str): ) and not isinstance(streaming_response, str):
# Check if streaming_callback is provided (for ConcurrentWorkflow dashboard integration)
if streaming_callback is not None:
# Real-time callback streaming for dashboard integration
chunks = []
for chunk in streaming_response:
if (
hasattr(chunk, "choices")
and chunk.choices[0].delta.content
):
content = chunk.choices[
0
].delta.content
chunks.append(content)
# Call the streaming callback with the new chunk
streaming_callback(content)
complete_response = "".join(chunks)
# Check print_on parameter for different streaming behaviors # Check print_on parameter for different streaming behaviors
if self.print_on is False: elif self.print_on is False:
# Silent streaming - no printing, just collect chunks # Silent streaming - no printing, just collect chunks
chunks = [] chunks = []
for chunk in streaming_response: for chunk in streaming_response:
@ -2599,6 +2620,7 @@ class Agent:
img: Optional[str] = None, img: Optional[str] = None,
imgs: Optional[List[str]] = None, imgs: Optional[List[str]] = None,
correct_answer: Optional[str] = None, correct_answer: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
*args, *args,
**kwargs, **kwargs,
) -> Any: ) -> Any:
@ -2613,6 +2635,7 @@ class Agent:
task (Optional[str], optional): The task to be executed. Defaults to None. task (Optional[str], optional): The task to be executed. Defaults to None.
img (Optional[str], optional): The image to be processed. Defaults to None. img (Optional[str], optional): The image to be processed. Defaults to None.
imgs (Optional[List[str]], optional): The list of images to be processed. Defaults to None. imgs (Optional[List[str]], optional): The list of images to be processed. Defaults to None.
streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None.
*args: Additional positional arguments to be passed to the execution method. *args: Additional positional arguments to be passed to the execution method.
**kwargs: Additional keyword arguments to be passed to the execution method. **kwargs: Additional keyword arguments to be passed to the execution method.
@ -2644,6 +2667,7 @@ class Agent:
output = self._run( output = self._run(
task=task, task=task,
img=img, img=img,
streaming_callback=streaming_callback,
*args, *args,
**kwargs, **kwargs,
) )

@ -1,5 +1,6 @@
import concurrent.futures import concurrent.futures
import os import os
import time
from typing import Callable, List, Optional, Union from typing import Callable, List, Optional, Union
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
@ -450,8 +451,25 @@ class ConcurrentWorkflow(BaseSwarm):
if self.show_dashboard: if self.show_dashboard:
self.display_agent_dashboard() self.display_agent_dashboard()
# Run the agent # Create a streaming callback for this agent with throttling
output = agent.run(task=task, img=img, imgs=imgs) last_update_time = [0] # Use list to allow modification in nested function
update_interval = 0.1 # Update dashboard every 100ms for smooth streaming
def streaming_callback(chunk: str):
"""Update dashboard with streaming content"""
if self.show_dashboard:
# Append the chunk to the agent's current output
current_output = self.agent_statuses[agent.agent_name]["output"]
self.agent_statuses[agent.agent_name]["output"] = current_output + chunk
# Throttle dashboard updates for better performance
current_time = time.time()
if current_time - last_update_time[0] >= update_interval:
self.display_agent_dashboard()
last_update_time[0] = current_time
# Run the agent with streaming callback
output = agent.run(task=task, img=img, imgs=imgs, streaming_callback=streaming_callback)
# Update status to completed # Update status to completed
self.agent_statuses[agent.agent_name][ self.agent_statuses[agent.agent_name][

Loading…
Cancel
Save