[FIX] Streaming Callback & Demo Reconfig

pull/1081/merge^2
Aksh Parekh 3 weeks ago
parent 93612a9307
commit bca8bbdf87

@ -0,0 +1,119 @@
import time
from typing import Callable
from swarms.structs.agent import Agent
from swarms.structs.sequential_workflow import SequentialWorkflow
def create_streaming_callback() -> Callable[[str, str, bool], None]:
"""
Create a streaming callback that shows live paragraph formation.
"""
agent_buffers = {}
paragraph_count = {}
def streaming_callback(agent_name: str, chunk: str, is_final: bool):
timestamp = time.strftime("%H:%M:%S")
# Initialize buffers for new agents
if agent_name not in agent_buffers:
agent_buffers[agent_name] = ""
paragraph_count[agent_name] = 1
print(f"\n🎬 [{timestamp}] {agent_name} starting...")
print("=" * 60)
if chunk.strip():
# Split chunk into tokens (words/punctuation)
tokens = chunk.replace("\n", " \n ").split()
for token in tokens:
# Handle paragraph breaks
if token == "\n":
if agent_buffers[agent_name].strip():
print(
f"\n📄 [{timestamp}] {agent_name} - Paragraph {paragraph_count[agent_name]} Complete:"
)
print(f"{agent_buffers[agent_name].strip()}")
print("=" * 60)
paragraph_count[agent_name] += 1
agent_buffers[agent_name] = ""
else:
# Add token to buffer and show live accumulation
agent_buffers[agent_name] += token + " "
print(
f"\r[{timestamp}] {agent_name} | {agent_buffers[agent_name].strip()}",
end="",
flush=True,
)
if is_final:
print() # New line after live updates
# Print any remaining content as final paragraph
if agent_buffers[agent_name].strip():
print(
f"\n✅ [{timestamp}] {agent_name} COMPLETED - Final Paragraph:"
)
print(f"{agent_buffers[agent_name].strip()}")
print()
print(f"🎯 [{timestamp}] {agent_name} finished processing")
print(f"📊 Total paragraphs processed: {paragraph_count[agent_name] - 1}")
print("=" * 60)
return streaming_callback
def create_agents():
"""Create specialized agents for the workflow."""
return [
Agent(
agent_name="Research_Agent",
agent_description="Specialized in gathering and analyzing information",
system_prompt="You are a research specialist. Provide detailed, accurate information on any topic.",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
),
Agent(
agent_name="Analysis_Agent",
agent_description="Expert at analyzing data and drawing insights",
system_prompt="You are an analysis expert. Break down complex information and provide clear insights.",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
),
]
if __name__ == "__main__":
print("🎯 SEQUENTIAL WORKFLOW STREAMING DEMO")
print("=" * 50)
# Create agents and workflow
agents = create_agents()
workflow = SequentialWorkflow(
id="research_analysis_workflow",
name="Research Analysis Workflow",
description="A sequential workflow that researches and analyzes topics",
agents=agents,
max_loops=1,
output_type="str",
multi_agent_collab_prompt=True,
)
# Define task
task = "What are the latest advancements in AI?"
print(f"Task: {task.strip()}")
# Create streaming callback
streaming_callback = create_streaming_callback()
print("\n🎬 EXECUTING WITH STREAMING CALLBACKS...")
print("Watch real-time agent outputs below:\n")
# Execute with streaming
result = workflow.run(
task=task,
streaming_callback=streaming_callback,
)
print("\n🎉 EXECUTION COMPLETED!")
print("\n📊 FINAL RESULT:")
print("-" * 50)
print(result)

@ -5,7 +5,6 @@ from typing import Any, Callable, Dict, List, Optional, Union
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation from swarms.structs.conversation import Conversation
from swarms.telemetry.main import log_agent_data from swarms.telemetry.main import log_agent_data
from swarms.utils.any_to_str import any_to_str
from swarms.utils.history_output_formatter import ( from swarms.utils.history_output_formatter import (
history_output_formatter, history_output_formatter,
) )
@ -313,6 +312,9 @@ class AgentRearrange:
task: str = None, task: str = None,
img: str = None, img: str = None,
custom_tasks: Dict[str, str] = None, custom_tasks: Dict[str, str] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -347,8 +349,6 @@ class AgentRearrange:
return "Invalid flow configuration." return "Invalid flow configuration."
tasks = self.flow.split("->") tasks = self.flow.split("->")
current_task = task
response_dict = {}
logger.info( logger.info(
f"Starting task execution with {len(tasks)} steps" f"Starting task execution with {len(tasks)} steps"
@ -386,30 +386,51 @@ class AgentRearrange:
for agent_name in agent_names: for agent_name in agent_names:
agent = self.agents[agent_name] agent = self.agents[agent_name]
if self.streaming_callback is not None: # Handle streaming callback if provided
agent.streaming_on = True if streaming_callback is not None:
result = agent.run(
task=self.conversation.get_str(), def agent_streaming_callback(chunk: str):
img=img, """Wrapper for agent streaming callback."""
try:
if chunk is not None and chunk.strip():
streaming_callback(
agent_name, chunk, False
)
except Exception as callback_error:
if self.verbose:
logger.warning(
f"[STREAMING] Callback failed for {agent_name}: {str(callback_error)}"
)
output = agent.run(
task=f"History: {self.conversation.get_str()} \n\n Task: {task}",
streaming_callback=agent_streaming_callback,
*args, *args,
**kwargs, **kwargs,
) )
result = any_to_str(result)
# Call completion callback
if self.streaming_callback: try:
self.streaming_callback(result) streaming_callback(agent_name, "", True)
except Exception as callback_error:
self.conversation.add( if self.verbose:
agent.agent_name, result logger.warning(
f"[STREAMING] Completion callback failed for {agent_name}: {str(callback_error)}"
)
else:
output = agent.run(
task=f"History: {self.conversation.get_str()} \n\n Task: {task}",
*args,
**kwargs,
) )
self.conversation.add(role=agent_name, content=output)
response_dict[agent_name] = result if self.verbose:
logger.debug( logger.success(
f"Agent {agent_name} output: {result}" f"[SUCCESS] Agent {agent_name} completed task successfully"
) )
",".join(agent_names) self.conversation.add(role=agent_name, content=output)
else: else:
# Sequential processing # Sequential processing
@ -433,24 +454,50 @@ class AgentRearrange:
f"Added sequential awareness for {agent_name}: {awareness_info}" f"Added sequential awareness for {agent_name}: {awareness_info}"
) )
if self.streaming_callback is not None: # Handle streaming callback if provided
agent.streaming_on = True if streaming_callback is not None:
current_task = agent.run(
task=self.conversation.get_str(), def agent_streaming_callback(chunk: str):
img=img, """Wrapper for agent streaming callback."""
try:
if chunk is not None and chunk.strip():
streaming_callback(
agent_name, chunk, False
)
except Exception as callback_error:
if self.verbose:
logger.warning(
f"[STREAMING] Callback failed for {agent_name}: {str(callback_error)}"
)
output = agent.run(
task=f"History: {self.conversation.get_str()} \n\n Task: {task}",
streaming_callback=agent_streaming_callback,
*args, *args,
**kwargs, **kwargs,
) )
current_task = any_to_str(current_task)
if self.streaming_callback: # Call completion callback
self.streaming_callback(current_task) try:
streaming_callback(agent_name, "", True)
except Exception as callback_error:
if self.verbose:
logger.warning(
f"[STREAMING] Completion callback failed for {agent_name}: {str(callback_error)}"
)
else:
output = agent.run(
task=f"History: {self.conversation.get_str()} \n\n Task: {task}",
*args,
**kwargs,
)
self.conversation.add(role=agent_name, content=output)
self.conversation.add( if self.verbose:
agent.agent_name, current_task logger.success(
f"[SUCCESS] Agent {agent_name} completed task successfully"
) )
response_dict[agent_name] = current_task
loop_count += 1 loop_count += 1

@ -49,7 +49,7 @@ class SequentialWorkflow:
shared_memory_system: callable = None, shared_memory_system: callable = None,
multi_agent_collab_prompt: bool = True, multi_agent_collab_prompt: bool = True,
team_awareness: bool = False, team_awareness: bool = False,
streaming_callback: Optional[Callable[[str], None]] = None, streaming_callback: Optional[Callable[[str, str, bool], None]] = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -163,6 +163,9 @@ class SequentialWorkflow:
task: str, task: str,
img: Optional[str] = None, img: Optional[str] = None,
imgs: Optional[List[str]] = None, imgs: Optional[List[str]] = None,
streaming_callback: Optional[
Callable[[str, str, bool], None]
] = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -188,7 +191,7 @@ class SequentialWorkflow:
return self.agent_rearrange.run( return self.agent_rearrange.run(
task=task, task=task,
img=img, img=img,
streaming_callback=self.streaming_callback, streaming_callback=streaming_callback,
*args, *args,
**kwargs, **kwargs,
) )

Loading…
Cancel
Save