From 57a030fb193f4a9ab5cc550fef3cb5c79ae47ae4 Mon Sep 17 00:00:00 2001 From: Aksh Parekh Date: Fri, 12 Sep 2025 23:17:46 -0700 Subject: [PATCH] wrap up --- swarms/structs/agent_rearrange.py | 19 +----- swarms/structs/sequential_workflow.py | 10 +-- test_sequentialworkflow_streaming.py | 88 +++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 27 deletions(-) create mode 100644 test_sequentialworkflow_streaming.py diff --git a/swarms/structs/agent_rearrange.py b/swarms/structs/agent_rearrange.py index 6480cb14..1601a7ad 100644 --- a/swarms/structs/agent_rearrange.py +++ b/swarms/structs/agent_rearrange.py @@ -38,7 +38,6 @@ class AgentRearrange: team_awareness: bool = False, time_enabled: bool = False, message_id_on: bool = False, - streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ): @@ -56,7 +55,6 @@ class AgentRearrange: self.autosave = autosave self.time_enabled = time_enabled self.message_id_on = message_id_on - self.streaming_callback = streaming_callback self.conversation = Conversation( name=f"{self.name}-Conversation", @@ -386,9 +384,6 @@ class AgentRearrange: for agent_name in agent_names: agent = self.agents[agent_name] - # Set agent.streaming_on if no streaming_callback - if self.streaming_callback is not None: - agent.streaming_on = True result = agent.run( task=self.conversation.get_str(), img=img, @@ -397,11 +392,6 @@ class AgentRearrange: ) result = any_to_str(result) - - # Call streaming callback with the result if provided - if self.streaming_callback: - self.streaming_callback(result) - self.conversation.add( agent.agent_name, result ) @@ -436,9 +426,6 @@ class AgentRearrange: f"Added sequential awareness for {agent_name}: {awareness_info}" ) - # Set agent.streaming_on if no streaming_callback - if self.streaming_callback is not None: - agent.streaming_on = True current_task = agent.run( task=self.conversation.get_str(), img=img, @@ -447,10 +434,6 @@ class AgentRearrange: ) current_task = any_to_str(current_task) - # Call streaming callback with the result if provided - if self.streaming_callback: - self.streaming_callback(current_task) - self.conversation.add( agent.agent_name, current_task ) @@ -720,4 +703,4 @@ def rearrange( *args, **kwargs, ) - return agent_system.run(task=task, img=img) + return agent_system.run(task=task, img=img) \ No newline at end of file diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index c1de30a9..73261a1d 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -49,7 +49,6 @@ class SequentialWorkflow: shared_memory_system: callable = None, multi_agent_collab_prompt: bool = True, team_awareness: bool = False, - streaming_callback: Optional[Callable[[str], None]] = None, *args, **kwargs, ): @@ -65,8 +64,6 @@ class SequentialWorkflow: output_type (OutputType, optional): Output format for the workflow. Defaults to "dict". shared_memory_system (callable, optional): Callable for shared memory management. Defaults to None. multi_agent_collab_prompt (bool, optional): If True, appends a collaborative prompt to each agent. - team_awareness (bool, optional): Whether to enable team awareness. Defaults to False. - streaming_callback (Optional[Callable[[str], None]], optional): Callback function to receive streaming tokens in real-time. Defaults to None. *args: Additional positional arguments. **kwargs: Additional keyword arguments. @@ -82,7 +79,6 @@ class SequentialWorkflow: self.shared_memory_system = shared_memory_system self.multi_agent_collab_prompt = multi_agent_collab_prompt self.team_awareness = team_awareness - self.streaming_callback = streaming_callback self.reliability_check() self.flow = self.sequential_flow() @@ -95,7 +91,6 @@ class SequentialWorkflow: max_loops=self.max_loops, output_type=self.output_type, team_awareness=self.team_awareness, - streaming_callback=self.streaming_callback, *args, **kwargs, ) @@ -188,9 +183,6 @@ class SequentialWorkflow: return self.agent_rearrange.run( task=task, img=img, - streaming_callback=self.streaming_callback, - *args, - **kwargs, ) except Exception as e: @@ -304,4 +296,4 @@ class SequentialWorkflow: logger.error( f"An error occurred while executing the batch of tasks concurrently: {e}" ) - raise + raise \ No newline at end of file diff --git a/test_sequentialworkflow_streaming.py b/test_sequentialworkflow_streaming.py new file mode 100644 index 00000000..310c9cc8 --- /dev/null +++ b/test_sequentialworkflow_streaming.py @@ -0,0 +1,88 @@ +from swarms.structs.agent import Agent +from swarms.structs.sequential_workflow import SequentialWorkflow + +def run_workflow_with_streaming_callback(task, streaming_callback): + """ + Run a sequential workflow with two agents and a streaming callback. + + Args: + task (str): The task to process through the workflow. + streaming_callback (callable): Function to handle streaming output. + + Returns: + The final result from the workflow. + """ + + agent1 = Agent( + name="Research Agent", + description="A research agent that can answer questions", + model_name="gpt-4o", + system_prompt=( + "You are a ResearchAgent. Your task is to research and gather " + "information about the given topic. Provide comprehensive research " + "findings and key insights." + ), + max_loops=1, + interactive=True, + verbose=True, + ) + + agent2 = Agent( + name="Analysis Agent", + description="An analysis agent that draws conclusions from research", + model_name="gpt-4o-mini", + system_prompt=( + "You are an AnalysisAgent. Your task is to analyze the research " + "provided by the previous agent and draw meaningful conclusions. " + "Provide detailed analysis and actionable insights." + ), + max_loops=1, + interactive=True, + verbose=True, + ) + + workflow = SequentialWorkflow( + id="research_analysis_workflow", + name="Research Analysis Workflow", + description="A sequential workflow that researches and analyzes topics", + agents=[agent1, agent2], + max_loops=1, + output_type="str", + streaming_callback=streaming_callback, + multi_agent_collab_prompt=True, + ) + return workflow.run(task) + +if __name__ == "__main__": + +# ## REGULAR STREAMING CALLBACK +# def streaming_callback(token): +# print(token, end="", flush=True) + +# run_workflow_with_streaming_callback( +# task="What are the latest advancements in AI?", +# streaming_callback=streaming_callback, +# ) + + +# ## CUSTOM BUFFERING STREAMING_CALLBACK BASED ON DEV PREFERED +# buffer = [] +# def streaming_callback(token): +# buffer.append(token) +# # Print in bigger chunks (e.g., every 20 tokens or on final flush) +# if len(buffer) >= 20 or token.endswith("\n"): +# print("".join(buffer), end="", flush=True) +# buffer.clear() +# # Optionally, you could add a flush at the end of the run if needed + +# run_workflow_with_streaming_callback( +# task="What are the latest advancements in AI?", +# streaming_callback=streaming_callback, +# ) + + +## NO ADDED STREAMING_CALLBACK + run_workflow_with_streaming_callback( + task="What are the latest advancements in AI?", + streaming_callback=None, + ) \ No newline at end of file