[FIX] Streaming Example Function

pull/1081/head
Aksh Parekh 3 weeks ago
parent bca8bbdf87
commit 42ddace77b

@ -1,62 +1,16 @@
import time
from typing import Callable
from swarms.structs.agent import Agent
from swarms.structs.sequential_workflow import SequentialWorkflow
def create_streaming_callback() -> Callable[[str, str, bool], None]:
"""
Create a streaming callback that shows live paragraph formation.
Create a streaming callback that prints each chunk as it arrives, with no duplication or timestamps.
"""
agent_buffers = {}
paragraph_count = {}
def streaming_callback(agent_name: str, chunk: str, is_final: bool):
timestamp = time.strftime("%H:%M:%S")
# Initialize buffers for new agents
if agent_name not in agent_buffers:
agent_buffers[agent_name] = ""
paragraph_count[agent_name] = 1
print(f"\n🎬 [{timestamp}] {agent_name} starting...")
print("=" * 60)
if chunk.strip():
# Split chunk into tokens (words/punctuation)
tokens = chunk.replace("\n", " \n ").split()
for token in tokens:
# Handle paragraph breaks
if token == "\n":
if agent_buffers[agent_name].strip():
print(
f"\n📄 [{timestamp}] {agent_name} - Paragraph {paragraph_count[agent_name]} Complete:"
)
print(f"{agent_buffers[agent_name].strip()}")
print("=" * 60)
paragraph_count[agent_name] += 1
agent_buffers[agent_name] = ""
else:
# Add token to buffer and show live accumulation
agent_buffers[agent_name] += token + " "
print(
f"\r[{timestamp}] {agent_name} | {agent_buffers[agent_name].strip()}",
end="",
flush=True,
)
if chunk:
print(chunk, end="", flush=True)
if is_final:
print() # New line after live updates
# Print any remaining content as final paragraph
if agent_buffers[agent_name].strip():
print(
f"\n✅ [{timestamp}] {agent_name} COMPLETED - Final Paragraph:"
)
print(f"{agent_buffers[agent_name].strip()}")
print()
print(f"🎯 [{timestamp}] {agent_name} finished processing")
print(f"📊 Total paragraphs processed: {paragraph_count[agent_name] - 1}")
print("=" * 60)
return streaming_callback
def create_agents():
@ -69,6 +23,7 @@ def create_agents():
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
print_on=False,
),
Agent(
agent_name="Analysis_Agent",
@ -77,6 +32,7 @@ def create_agents():
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
print_on=False,
),
]
@ -84,7 +40,6 @@ if __name__ == "__main__":
print("🎯 SEQUENTIAL WORKFLOW STREAMING DEMO")
print("=" * 50)
# Create agents and workflow
agents = create_agents()
workflow = SequentialWorkflow(
id="research_analysis_workflow",
@ -96,18 +51,15 @@ if __name__ == "__main__":
multi_agent_collab_prompt=True,
)
# Define task
task = "What are the latest advancements in AI?"
print(f"Task: {task.strip()}")
# Create streaming callback
streaming_callback = create_streaming_callback()
print("\n🎬 EXECUTING WITH STREAMING CALLBACKS...")
print("Watch real-time agent outputs below:\n")
# Execute with streaming
result = workflow.run(
task=task,
streaming_callback=streaming_callback,

@ -133,7 +133,6 @@ class SequentialWorkflow:
agent_names = []
for agent in self.agents:
try:
# Try to get agent_name, fallback to name if not available
agent_name = (
getattr(agent, "agent_name", None)
or agent.name
@ -187,7 +186,6 @@ class SequentialWorkflow:
Exception: If any error occurs during task execution.
"""
try:
# prompt = f"{MULTI_AGENT_COLLAB_PROMPT}\n\n{task}"
return self.agent_rearrange.run(
task=task,
img=img,

Loading…
Cancel
Save