diff --git a/README.md b/README.md
index e8dd3220..bfb77944 100644
--- a/README.md
+++ b/README.md
@@ -27,7 +27,7 @@ Run example in Collab:
-### `Agent` Example
+### `Agent`
- Reliable Structure that provides LLMS autonomy
- Extremely Customizeable with stopping conditions, interactivity, dynamical temperature, loop intervals, and so much more
- Enterprise Grade + Production Grade: `Agent` is designed and optimized for automating real-world tasks at scale!
@@ -127,15 +127,69 @@ for task in workflow.tasks:
```
-## `Multi Modal Autonomous Agents`
-- Run the agent with multiple modalities useful for various real-world tasks in manufacturing, logistics, and health.
+
+
+### `ModelParallelizer`
+- Concurrent Execution of Multiple Models: The ModelParallelizer allows you to run multiple models concurrently, comparing their outputs. This feature enables you to easily compare the performance and results of different models, helping you make informed decisions about which model to use for your specific task.
+
+- Plug-and-Play Integration: The structure provides a seamless integration with various models, including OpenAIChat, Anthropic, Mixtral, and Gemini. You can easily plug in any of these models and start using them without the need for extensive modifications or setup.
+
```python
-# Description: This is an example of how to use the Agent class to run a multi-modal workflow
import os
+
from dotenv import load_dotenv
-from swarms.models.gpt4_vision_api import GPT4VisionAPI
-from swarms.structs import Agent
+
+from swarms.models import Anthropic, Gemini, Mixtral, OpenAIChat
+from swarms.swarms import ModelParallelizer
+
+load_dotenv()
+
+# API Keys
+anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
+openai_api_key = os.getenv("OPENAI_API_KEY")
+gemini_api_key = os.getenv("GEMINI_API_KEY")
+
+# Initialize the models
+llm = OpenAIChat(openai_api_key=openai_api_key)
+anthropic = Anthropic(anthropic_api_key=anthropic_api_key)
+mixtral = Mixtral()
+gemini = Gemini(gemini_api_key=gemini_api_key)
+
+# Initialize the parallelizer
+llms = [llm, anthropic, mixtral, gemini]
+parallelizer = ModelParallelizer(llms)
+
+# Set the task
+task = "Generate a 10,000 word blog on health and wellness."
+
+# Run the task
+out = parallelizer.run(task)
+
+# Print the responses 1 by 1
+for i in range(len(out)):
+ print(f"Response from LLM {i}: {out[i]}")
+```
+
+
+### Simple Conversational Agent
+- Plug in and play conversational agent with `GPT4`, `Mixytral`, or any of our models
+- Reliable conversational structure to hold messages together with dynamic handling for long context conversations and interactions with auto chunking
+- Reliable, this simple system will always provide responses you want.
+
+```python
+import os
+
+from dotenv import load_dotenv
+
+from swarms import (
+ OpenAIChat,
+ Conversation,
+)
+
+conv = Conversation(
+ time_enabled=True,
+)
# Load the environment variables
load_dotenv()
@@ -144,65 +198,161 @@ load_dotenv()
api_key = os.environ.get("OPENAI_API_KEY")
# Initialize the language model
-llm = GPT4VisionAPI(
- openai_api_key=api_key,
- max_tokens=500,
-)
+llm = OpenAIChat(openai_api_key=api_key, model_name="gpt-4")
-# Initialize the task
-task = (
- "Analyze this image of an assembly line and identify any issues such as"
- " misaligned parts, defects, or deviations from the standard assembly"
- " process. IF there is anything unsafe in the image, explain why it is"
- " unsafe and how it could be improved."
+# Run the language model in a loop
+def interactive_conversation(llm):
+ conv = Conversation()
+ while True:
+ user_input = input("User: ")
+ conv.add("user", user_input)
+ if user_input.lower() == "quit":
+ break
+ task = (
+ conv.return_history_as_string()
+ ) # Get the conversation history
+ out = llm(task)
+ conv.add("assistant", out)
+ print(
+ f"Assistant: {out}",
+ )
+ conv.display_conversation()
+ conv.export_conversation("conversation.txt")
+
+
+# Replace with your LLM instance
+interactive_conversation(llm)
+
+```
+
+
+### `SwarmNetwork`
+- Efficient Task Management: SwarmNetwork's intelligent agent pool and task queue management system ensures tasks are distributed evenly across agents. This leads to efficient use of resources and faster task completion.
+
+- Scalability: SwarmNetwork can dynamically scale the number of agents based on the number of pending tasks. This means it can handle an increase in workload by adding more agents, and conserve resources when the workload is low by reducing the number of agents.
+
+- Versatile Deployment Options: With SwarmNetwork, each agent can be run on its own thread, process, container, machine, or even cluster. This provides a high degree of flexibility and allows for deployment that best suits the user's needs and infrastructure.
+
+```python
+import os
+
+from dotenv import load_dotenv
+
+# Import the OpenAIChat model and the Agent struct
+from swarms import OpenAIChat, Agent, SwarmNetwork
+
+# Load the environment variables
+load_dotenv()
+
+# Get the API key from the environment
+api_key = os.environ.get("OPENAI_API_KEY")
+
+# Initialize the language model
+llm = OpenAIChat(
+ temperature=0.5,
+ openai_api_key=api_key,
)
-img = "assembly_line.jpg"
## Initialize the workflow
-agent = Agent(
- llm=llm,
- max_loops="auto",
- autosave=True,
- dashboard=True,
- multi_modal=True
+agent = Agent(llm=llm, max_loops=1, agent_name="Social Media Manager")
+agent2 = Agent(llm=llm, max_loops=1, agent_name=" Product Manager")
+agent3 = Agent(llm=llm, max_loops=1, agent_name="SEO Manager")
+
+
+# Load the swarmnet with the agents
+swarmnet = SwarmNetwork(
+ agents=[agent, agent2, agent3],
)
+# List the agents in the swarm network
+out = swarmnet.list_agents()
+print(out)
+
# Run the workflow on a task
-agent.run(task=task, img=img)
+out = swarmnet.run_single_agent(
+ agent2.id, "Generate a 10,000 word blog on health and wellness."
+)
+print(out)
+# Run all the agents in the swarm network on a task
+out = swarmnet.run_many_agents(
+ "Generate a 10,000 word blog on health and wellness."
+)
+print(out)
+
```
-### `OmniModalAgent`
-- An agent that can understand any modality and conditionally generate any modality.
+### `Task`
+Task Execution: The Task structure allows for the execution of tasks by an assigned agent. The run method is used to execute the task. It's like a Zapier for LLMs
+
+- Task Description: Each Task can have a description, providing a human-readable explanation of what the task is intended to do.
+- Task Scheduling: Tasks can be scheduled for execution at a specific time using the schedule_time attribute.
+- Task Triggers: The set_trigger method allows for the setting of a trigger function that is executed before the task.
+- Task Actions: The set_action method allows for the setting of an action function that is executed after the task.
+- Task Conditions: The set_condition method allows for the setting of a condition function. The task will only be executed if this function returns True.
+- Task Dependencies: The add_dependency method allows for the addition of dependencies to the task. The task will only be executed if all its dependencies have been completed.
+- Task Priority: The set_priority method allows for the setting of the task's priority. Tasks with higher priority will be executed before tasks with lower priority.
+- Task History: The history attribute is a list that keeps track of all the results of the task execution. This can be useful for debugging and for tasks that need to be executed multiple times.
```python
-from swarms.agents.omni_modal_agent import OmniModalAgent, OpenAIChat
+from swarms.structs import Task, Agent
from swarms.models import OpenAIChat
from dotenv import load_dotenv
import os
+
# Load the environment variables
load_dotenv()
-# Get the API key from the environment
-api_key = os.environ.get("OPENAI_API_KEY")
-# Initialize the language model
-llm = OpenAIChat(
- temperature=0.5,
- model_name="gpt-4",
- openai_api_key=api_key,
+# Define a function to be used as the action
+def my_action():
+ print("Action executed")
+
+
+# Define a function to be used as the condition
+def my_condition():
+ print("Condition checked")
+ return True
+
+
+# Create an agent
+agent = Agent(
+ llm=OpenAIChat(openai_api_key=os.environ["OPENAI_API_KEY"]),
+ max_loops=1,
+ dashboard=False,
)
+# Create a task
+task = Task(description="What's the weather in miami", agent=agent)
+
+# Set the action and condition
+task.set_action(my_action)
+task.set_condition(my_condition)
+
+# Execute the task
+print("Executing task...")
+task.run()
+
+# Check if the task is completed
+if task.is_completed():
+ print("Task completed")
+else:
+ print("Task not completed")
+
+# Output the result of the task
+print(f"Task result: {task.result}")
+
-agent = OmniModalAgent(llm)
-agent.run("Generate a video of a swarm of fish and then make an image out of the video")
```
---
+
+## Real-World Deployment
+
### Multi-Agent Swarm for Logistics
- Swarms is a framework designed for real-world deployment here is a demo presenting a fully ready to use Swarm for a vast array of logistics tasks.
- Swarms is designed to be modular and reliable for real-world deployments.
@@ -312,6 +462,58 @@ efficiency_analysis = efficiency_agent.run(
factory_image,
)
```
+---
+
+
+## `Multi Modal Autonomous Agents`
+- Run the agent with multiple modalities useful for various real-world tasks in manufacturing, logistics, and health.
+
+```python
+# Description: This is an example of how to use the Agent class to run a multi-modal workflow
+import os
+from dotenv import load_dotenv
+from swarms.models.gpt4_vision_api import GPT4VisionAPI
+from swarms.structs import Agent
+
+# Load the environment variables
+load_dotenv()
+
+# Get the API key from the environment
+api_key = os.environ.get("OPENAI_API_KEY")
+
+# Initialize the language model
+llm = GPT4VisionAPI(
+ openai_api_key=api_key,
+ max_tokens=500,
+)
+
+# Initialize the task
+task = (
+ "Analyze this image of an assembly line and identify any issues such as"
+ " misaligned parts, defects, or deviations from the standard assembly"
+ " process. IF there is anything unsafe in the image, explain why it is"
+ " unsafe and how it could be improved."
+)
+img = "assembly_line.jpg"
+
+## Initialize the workflow
+agent = Agent(
+ llm=llm,
+ max_loops="auto",
+ autosave=True,
+ dashboard=True,
+ multi_modal=True
+)
+
+# Run the workflow on a task
+agent.run(task=task, img=img)
+
+
+```
+
+---
+
+## Multi-Modal Model APIs
### `Gemini`
- Deploy Gemini from Google with utmost reliability with our visual chain of thought prompt that enables more reliable responses
@@ -460,160 +662,6 @@ print(video_path)
```
-### `ModelParallelizer`
-- Concurrent Execution of Multiple Models: The ModelParallelizer allows you to run multiple models concurrently, comparing their outputs. This feature enables you to easily compare the performance and results of different models, helping you make informed decisions about which model to use for your specific task.
-
-- Plug-and-Play Integration: The structure provides a seamless integration with various models, including OpenAIChat, Anthropic, Mixtral, and Gemini. You can easily plug in any of these models and start using them without the need for extensive modifications or setup.
-
-
-```python
-import os
-
-from dotenv import load_dotenv
-
-from swarms.models import Anthropic, Gemini, Mixtral, OpenAIChat
-from swarms.swarms import ModelParallelizer
-
-load_dotenv()
-
-# API Keys
-anthropic_api_key = os.getenv("ANTHROPIC_API_KEY")
-openai_api_key = os.getenv("OPENAI_API_KEY")
-gemini_api_key = os.getenv("GEMINI_API_KEY")
-
-# Initialize the models
-llm = OpenAIChat(openai_api_key=openai_api_key)
-anthropic = Anthropic(anthropic_api_key=anthropic_api_key)
-mixtral = Mixtral()
-gemini = Gemini(gemini_api_key=gemini_api_key)
-
-# Initialize the parallelizer
-llms = [llm, anthropic, mixtral, gemini]
-parallelizer = ModelParallelizer(llms)
-
-# Set the task
-task = "Generate a 10,000 word blog on health and wellness."
-
-# Run the task
-out = parallelizer.run(task)
-
-# Print the responses 1 by 1
-for i in range(len(out)):
- print(f"Response from LLM {i}: {out[i]}")
-```
-
-
-### Simple Conversational Agent
-- Plug in and play conversational agent with `GPT4`, `Mixytral`, or any of our models
-- Reliable conversational structure to hold messages together with dynamic handling for long context conversations and interactions with auto chunking
-- Reliable, this simple system will always provide responses you want.
-
-```python
-import os
-
-from dotenv import load_dotenv
-
-from swarms import (
- OpenAIChat,
- Conversation,
-)
-
-conv = Conversation(
- time_enabled=True,
-)
-
-# Load the environment variables
-load_dotenv()
-
-# Get the API key from the environment
-api_key = os.environ.get("OPENAI_API_KEY")
-
-# Initialize the language model
-llm = OpenAIChat(openai_api_key=api_key, model_name="gpt-4")
-
-# Run the language model in a loop
-def interactive_conversation(llm):
- conv = Conversation()
- while True:
- user_input = input("User: ")
- conv.add("user", user_input)
- if user_input.lower() == "quit":
- break
- task = (
- conv.return_history_as_string()
- ) # Get the conversation history
- out = llm(task)
- conv.add("assistant", out)
- print(
- f"Assistant: {out}",
- )
- conv.display_conversation()
- conv.export_conversation("conversation.txt")
-
-
-# Replace with your LLM instance
-interactive_conversation(llm)
-
-```
-
-
-### `SwarmNetwork`
-- Efficient Task Management: SwarmNetwork's intelligent agent pool and task queue management system ensures tasks are distributed evenly across agents. This leads to efficient use of resources and faster task completion.
-
-- Scalability: SwarmNetwork can dynamically scale the number of agents based on the number of pending tasks. This means it can handle an increase in workload by adding more agents, and conserve resources when the workload is low by reducing the number of agents.
-
-- Versatile Deployment Options: With SwarmNetwork, each agent can be run on its own thread, process, container, machine, or even cluster. This provides a high degree of flexibility and allows for deployment that best suits the user's needs and infrastructure.
-
-```python
-import os
-
-from dotenv import load_dotenv
-
-# Import the OpenAIChat model and the Agent struct
-from swarms import OpenAIChat, Agent, SwarmNetwork
-
-# Load the environment variables
-load_dotenv()
-
-# Get the API key from the environment
-api_key = os.environ.get("OPENAI_API_KEY")
-
-# Initialize the language model
-llm = OpenAIChat(
- temperature=0.5,
- openai_api_key=api_key,
-)
-
-## Initialize the workflow
-agent = Agent(llm=llm, max_loops=1, agent_name="Social Media Manager")
-agent2 = Agent(llm=llm, max_loops=1, agent_name=" Product Manager")
-agent3 = Agent(llm=llm, max_loops=1, agent_name="SEO Manager")
-
-
-# Load the swarmnet with the agents
-swarmnet = SwarmNetwork(
- agents=[agent, agent2, agent3],
-)
-
-# List the agents in the swarm network
-out = swarmnet.list_agents()
-print(out)
-
-# Run the workflow on a task
-out = swarmnet.run_single_agent(
- agent2.id, "Generate a 10,000 word blog on health and wellness."
-)
-print(out)
-
-
-# Run all the agents in the swarm network on a task
-out = swarmnet.run_many_agents(
- "Generate a 10,000 word blog on health and wellness."
-)
-print(out)
-
-```
-
---
# Features 🤖
@@ -688,7 +736,7 @@ Swarms framework is not just a tool but a robust, scalable, and secure partner i
## Documentation
-- For documentation, go here, [swarms.apac.ai](https://swarms.apac.ai)
+- Out documentation is located here at: [swarms.apac.ai](https://swarms.apac.ai)
## 🫶 Contributions:
@@ -709,7 +757,7 @@ To see how to contribute, visit [Contribution guidelines](https://github.com/kye
## Discovery Call
-Book a discovery call with the Swarms team to learn how to optimize and scale your swarm! [Click here to book a time that works for you!](https://calendly.com/swarm-corp/30min?month=2023-11)
+Book a discovery call to learn how Swarms can lower your operating costs by 40% with swarms of autonomous agents in lightspeed. [Click here to book a time that works for you!](https://calendly.com/swarm-corp/30min?month=2023-11)
# License
Apache License
diff --git a/pyproject.toml b/pyproject.toml
index 5c30cca4..e5fb11f0 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -39,7 +39,7 @@ backoff = "2.2.1"
marshmallow = "3.19.0"
datasets = "2.10.1"
optimum = "1.15.0"
-diffusers = "0.17.1"
+diffusers = "*"
PyPDF2 = "3.0.1"
accelerate = "0.22.0"
sentencepiece = "0.1.98"
diff --git a/requirements.txt b/requirements.txt
index 2ac7e502..d0dedf2e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -40,7 +40,7 @@ albumentations
basicsr
termcolor==2.2.0
controlnet-aux
-diffusers==0.17.1
+diffusers
einops==0.7.0
imageio==2.25.1
opencv-python-headless==4.8.1.78
diff --git a/swarms/models/kosmos_two.py b/swarms/models/kosmos_two.py
index b953ddf9..6bc4d810 100644
--- a/swarms/models/kosmos_two.py
+++ b/swarms/models/kosmos_two.py
@@ -10,6 +10,7 @@ from transformers import AutoModelForVision2Seq, AutoProcessor
from swarms.models.base_multimodal_model import BaseMultiModalModel
+
# utils
def is_overlapping(rect1, rect2):
x1, y1, x2, y2 = rect1
diff --git a/swarms/structs/task.py b/swarms/structs/task.py
index b96390b8..9c0f8dac 100644
--- a/swarms/structs/task.py
+++ b/swarms/structs/task.py
@@ -103,6 +103,12 @@ class Task:
except Exception as error:
print(f"[ERROR][Task] {error}")
+ def run(self):
+ self.execute()
+
+ def __call__(self):
+ self.execute()
+
def handle_scheduled_task(self):
"""
Handles the execution of a scheduled task.
diff --git a/swarms/swarms/__init__.py b/swarms/swarms/__init__.py
index ed39abfb..38ee6fb9 100644
--- a/swarms/swarms/__init__.py
+++ b/swarms/swarms/__init__.py
@@ -2,6 +2,7 @@ from swarms.structs.autoscaler import AutoScaler
from swarms.swarms.model_parallizer import ModelParallelizer
from swarms.swarms.multi_agent_collab import MultiAgentCollaboration
from swarms.swarms.base import AbstractSwarm
+
# from swarms.swarms.team import Team
__all__ = [
diff --git a/swarms/swarms/team.py b/swarms/swarms/team.py
index d4482db9..36c773e2 100644
--- a/swarms/swarms/team.py
+++ b/swarms/swarms/team.py
@@ -21,12 +21,16 @@ class Team(BaseModel):
"""
tasks: Optional[List[Task]] = Field(description="List of tasks")
- agents: Optional[List[Agent]] = Field(description="List of agents in this Team.")
+ agents: Optional[List[Agent]] = Field(
+ description="List of agents in this Team."
+ )
architecture = Field(
- description="architecture that the Team will follow.", default="sequential"
+ description="architecture that the Team will follow.",
+ default="sequential",
)
verbose: bool = Field(
- description="Verbose mode for the Agent Execution", default=False
+ description="Verbose mode for the Agent Execution",
+ default=False,
)
config: Optional[Json] = Field(
description="Configuration of the Team.", default=None
@@ -37,19 +41,27 @@ class Team(BaseModel):
if not values.get("config") and (
not values.get("agents") and not values.get("tasks")
):
- raise ValueError("Either agents and task need to be set or config.")
+ raise ValueError(
+ "Either agents and task need to be set or config."
+ )
if values.get("config"):
config = json.loads(values.get("config"))
if not config.get("agents") or not config.get("tasks"):
- raise ValueError("Config should have agents and tasks.")
+ raise ValueError(
+ "Config should have agents and tasks."
+ )
- values["agents"] = [Agent(**agent) for agent in config["agents"]]
+ values["agents"] = [
+ Agent(**agent) for agent in config["agents"]
+ ]
tasks = []
for task in config["tasks"]:
task_agent = [
- agt for agt in values["agents"] if agt.role == task["agent"]
+ agt
+ for agt in values["agents"]
+ if agt.role == task["agent"]
][0]
del task["agent"]
tasks.append(Task(**task, agent=task_agent))
@@ -92,4 +104,4 @@ class Team(BaseModel):
def __log(self, message):
if self.verbose:
- print(message)
\ No newline at end of file
+ print(message)
diff --git a/task.py b/task.py
new file mode 100644
index 00000000..089cb263
--- /dev/null
+++ b/task.py
@@ -0,0 +1,47 @@
+from swarms.structs import Task, Agent
+from swarms.models import OpenAIChat
+from dotenv import load_dotenv
+import os
+
+
+# Load the environment variables
+load_dotenv()
+
+
+# Define a function to be used as the action
+def my_action():
+ print("Action executed")
+
+
+# Define a function to be used as the condition
+def my_condition():
+ print("Condition checked")
+ return True
+
+
+# Create an agent
+agent = Agent(
+ llm=OpenAIChat(openai_api_key=os.environ["OPENAI_API_KEY"]),
+ max_loops=1,
+ dashboard=False,
+)
+
+# Create a task
+task = Task(description="What's the weather in miami", agent=agent)
+
+# Set the action and condition
+task.set_action(my_action)
+task.set_condition(my_condition)
+
+# Execute the task
+print("Executing task...")
+task.run()
+
+# Check if the task is completed
+if task.is_completed():
+ print("Task completed")
+else:
+ print("Task not completed")
+
+# Output the result of the task
+print(f"Task result: {task.result}")
diff --git a/tests/models/test_ssd_1b.py b/tests/models/test_ssd_1b.py
index 95f322ef..39e4264e 100644
--- a/tests/models/test_ssd_1b.py
+++ b/tests/models/test_ssd_1b.py
@@ -162,5 +162,3 @@ def test_ssd1b_repr_str(ssd1b_model):
image_url = ssd1b_model(task)
assert repr(ssd1b_model) == f"SSD1B(image_url={image_url})"
assert str(ssd1b_model) == f"SSD1B(image_url={image_url})"
-
-
diff --git a/tests/tools/test_base.py b/tests/tools/test_tools_base.py
similarity index 100%
rename from tests/tools/test_base.py
rename to tests/tools/test_tools_base.py
diff --git a/tests/utils/test_class_args_wrapper.py b/tests/utils/test_class_args_wrapper.py
index 67a8a7fa..a222ffe9 100644
--- a/tests/utils/test_class_args_wrapper.py
+++ b/tests/utils/test_class_args_wrapper.py
@@ -3,7 +3,6 @@ from io import StringIO
from contextlib import redirect_stdout
from swarms.utils.class_args_wrapper import print_class_parameters
from swarms.structs.agent import Agent
-from swarms.structs.autoscaler import Autoscaler
from fastapi import FastAPI
from fastapi.testclient import TestClient
@@ -23,19 +22,6 @@ def test_print_class_parameters_agent():
assert output == expected_output
-def test_print_class_parameters_autoscaler():
- f = StringIO()
- with redirect_stdout(f):
- print_class_parameters(Autoscaler)
- output = f.getvalue().strip()
- # Replace with the expected output for Autoscaler class
- expected_output = (
- "Parameter: min_agents, Type: \nParameter:"
- " max_agents, Type: "
- )
- assert output == expected_output
-
-
def test_print_class_parameters_error():
with pytest.raises(TypeError):
print_class_parameters("Not a class")
@@ -43,7 +29,7 @@ def test_print_class_parameters_error():
@app.get("/parameters/{class_name}")
def get_parameters(class_name: str):
- classes = {"Agent": Agent, "Autoscaler": Autoscaler}
+ classes = {"Agent": Agent}
if class_name in classes:
return print_class_parameters(
classes[class_name], api_format=True
@@ -63,17 +49,6 @@ def test_get_parameters_agent():
assert response.json() == expected_output
-def test_get_parameters_autoscaler():
- response = client.get("/parameters/Autoscaler")
- assert response.status_code == 200
- # Replace with the expected output for Autoscaler class
- expected_output = {
- "min_agents": "",
- "max_agents": "",
- }
- assert response.json() == expected_output
-
-
def test_get_parameters_not_found():
response = client.get("/parameters/NonexistentClass")
assert response.status_code == 200
diff --git a/tests/utils/test_subprocess_code_interpreter.py b/tests/utils/test_subprocess_code_interpreter.py
index 01f45da4..3ce54530 100644
--- a/tests/utils/test_subprocess_code_interpreter.py
+++ b/tests/utils/test_subprocess_code_interpreter.py
@@ -75,6 +75,3 @@ def test_handle_stream_output(interpreter, monkeypatch):
monkeypatch.setattr("sys.stdout", mock_readline())
# More test code needed here to simulate and assert the behavior of handle_stream_output
-
-
-# More tests needed for run method, error handling, and edge cases.