`Flow` docs and tests

pull/76/head
Kye 1 year ago
parent 5441fc17c2
commit 3944c0638b

1
.gitignore vendored

@ -24,6 +24,7 @@ stderr_log.txt
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class *$py.class
error.txt
# C extensions # C extensions
*.so *.so

@ -196,9 +196,11 @@ print(out)
## Focus ## Focus
- We are radically devoted to creating outcomes that our users want, we believe this is only possible by focusing extensively on reliability, scalability, and agility. We are radically devoted to creating outcomes that our users want, we believe this is only possible by focusing extensively on reliability, scalability, and agility.
- An Agent's purpose is to satisfy your wants and needs and so this is our only focus, we believe this is only possible by investing impeccable detail into agent structure design in other words gluing together an llm with tools and memory in a way that delights users and executes tasks exactly how users want them to be executed.
- The reliability of communication in a swarm is also extremely critical to your success and with this in mind we carefully craft and extensively test our structures. An Agent's purpose is to satisfy your wants and needs and so this is our only focus, we believe this is only possible by investing impeccable detail into agent structure design in other words gluing together an llm with tools and memory in a way that delights users and executes tasks exactly how users want them to be executed.
The reliability of communication in a swarm is also extremely critical to your success and with this in mind we carefully craft and extensively test our structures.
- Reliability. - Reliability.
- Scalability. - Scalability.

@ -0,0 +1,152 @@
# `Flow` Documentation
## Overview
The `Flow` class is a Python module designed to facilitate interactions with a language model, particularly one that operates as an autonomous agent. This class is part of a larger framework aimed at creating conversational agents using advanced language models like GPT-3. It enables you to establish a conversational loop with the model, generate responses, collect feedback, and control the flow of the conversation.
In this documentation, you will learn how to use the `Flow` class effectively, its purpose, and how it can be integrated into your projects.
## Purpose
The `Flow` class serves several key purposes:
1. **Conversational Loop**: It establishes a conversational loop with a language model. This means it allows you to interact with the model in a back-and-forth manner, taking turns in the conversation.
2. **Feedback Collection**: The class allows users to provide feedback on the responses generated by the model. This feedback can be valuable for training and improving the model's responses over time.
3. **Stoppable Conversation**: You can define custom stopping conditions for the conversation, allowing you to stop the interaction based on specific criteria. For example, you can stop the conversation if a certain keyword is detected in the responses.
4. **Retry Mechanism**: The class includes a retry mechanism that can be helpful if there are issues generating responses from the model. It attempts to generate a response multiple times before raising an error.
## Class Definition
The `Flow` class has the following constructor:
```python
class Flow:
def __init__(
self,
llm: Any,
max_loops: int = 5,
stopping_condition: Optional[Callable[[str], bool]] = None,
loop_interval: int = 1,
retry_attempts: int = 3,
retry_interval: int = 1,
interactive: bool = False,
**kwargs: Any,
):
```
### Parameters
- `llm` (Any): The language model with which you want to interact.
- `max_loops` (int): The maximum number of conversation loops. Default is 5.
- `stopping_condition` (Optional[Callable[[str], bool]]): A custom stopping condition function. Default is `None`.
- `loop_interval` (int): The time interval (in seconds) between conversation loops. Default is 1 second.
- `retry_attempts` (int): The number of retry attempts if response generation fails. Default is 3.
- `retry_interval` (int): The time interval (in seconds) between retry attempts. Default is 1 second.
- `interactive` (bool): Set to `True` if the conversation is interactive, meaning the user is involved. Default is `False`.
## Usage
The `Flow` class can be used to create a conversational loop with the language model. Here's how you can use it:
```python
from swarms.structs import Flow
flow = Flow(llm=my_language_model, max_loops=5)
# Define a starting task or message
initial_task = "Hello, can you provide me with some information?"
# Run the conversation loop
final_response = flow.run(initial_task)
```
### Feedback
You can collect feedback during the conversation using the `provide_feedback` method:
```python
flow.provide_feedback("The response was not accurate.")
```
### Stopping Condition
You can define a custom stopping condition using a function. For example, you can stop the conversation if the response contains the word "Stop":
```python
from swarms.structs import Flow
def stop_when_repeats(response: str) -> bool:
return "Stop" in response.lower()
flow = Flow(llm=my_language_model, max_loops=5, stopping_condition=stop_when_repeats)
```
### Retry Mechanism
If the response generation fails, the class will retry up to the specified number of attempts:
```python
flow = Flow(llm=my_language_model, max_loops=5, retry_attempts=3)
```
## Additional Information
- To save the conversation history to a file, you can use the `save` method.
- To load a previously saved conversation history, you can use the `load` method.
- The class includes methods for bulk running conversations with multiple input sets.
## Examples
Here are three usage examples:
### Example 1: Simple Conversation
```python
from swarms.structs import Flow
flow = Flow(llm=my_language_model, max_loops=5)
# Define a starting task or message
initial_task = "Hello, can you provide me with some information?"
# Run the conversation loop
final_response = flow.run(initial_task)
```
### Example 2: Custom Stopping Condition
```python
from swarms.structs import Flow
def stop_when_repeats(response: str) -> bool:
return "Stop" in response.lower()
flow = Flow(llm=my_language_model, max_loops=5, stopping_condition=stop_when_repeats)
```
### Example 3: Interactive Conversation
```python
from swarms.structs import Flow
flow = Flow(llm=my_language_model, max_loops=5, interactive=True)
# Provide initial task
initial_task = "Hello, can you tell me a joke?"
# Run the conversation loop
final_response = flow.run(initial_task)
```
## References and Resources
- [GitHub Repository](https://github.com/kyegomez/swarms)
## Conclusion
The `Flow` class provides a powerful way to interact with language models in a conversational manner. By defining custom stopping conditions, collecting feedback, and controlling the flow of the conversation, you can create engaging and interactive applications that make use of advanced language models.

@ -0,0 +1,22 @@
message='Request to OpenAI API' method=post path=https://api.openai.com/v1/chat/completions
api_version=None data='{"messages": [{"role": "user", "content": "Generate a 10,000 word blog, say Stop when done"}], "model": "gpt-3.5-turbo", "temperature": 0.5}' message='Post details'
Converted retries value: 2 -> Retry(total=2, connect=None, read=None, redirect=None, status=None)
Starting new HTTPS connection (1): api.openai.com:443
https://api.openai.com:443 "POST /v1/chat/completions HTTP/1.1" 200 None
message='OpenAI API response' path=https://api.openai.com/v1/chat/completions processing_ms=15480 request_id=52b27e863ef2b6e31c0c591d736f233b response_code=200
message='Request to OpenAI API' method=post path=https://api.openai.com/v1/chat/completions
api_version=None data='{"messages": [{"role": "user", "content": "Title: The Power of Positive Thinking: Transforming Your Life One Thought at a Time\\n\\nIntroduction (500 words):\\nIn a world filled with challenges, uncertainties, and negativity, it is more important than ever to harness the power of positive thinking. Our thoughts have the incredible ability to shape our reality, influence our actions, and ultimately determine the quality of our lives. By cultivating a positive mindset, we can navigate through life\'s obstacles with grace, find joy in the simplest of moments, and create a life that is truly fulfilling.\\n\\nChapter 1: Understanding Positive Thinking (1000 words)\\n- Exploring the concept of positive thinking and its impact on our mental and emotional well-being.\\n- The science behind positive thinking: how our thoughts affect our brain chemistry.\\n- Debunking common misconceptions about positive thinking.\\n- The benefits of cultivating a positive mindset.\\n\\nChapter 2: Overcoming Negative Thought Patterns (1200 words)\\n- Identifying and challenging negative thought patterns that hold us back.\\n- Techniques for reframing negative thoughts into positive ones.\\n- The role of self-awareness in recognizing and changing negative thinking.\\n- Strategies to break free from self-sabotaging behaviors.\\n\\nChapter 3: The Power of Affirmations (1000 words)\\n- Understanding the concept of affirmations and their effectiveness in rewiring our subconscious mind.\\n- Creating powerful affirmations that resonate with our goals and desires.\\n- Incorporating affirmations into our daily routine for maximum impact.\\n- Tips for overcoming skepticism and embracing the power of affirmations.\\n\\nChapter 4: Gratitude: The Key to Abundance (1200 words)\\n- Discovering the transformative power of gratitude in our lives.\\n- The science behind gratitude: how it rewires our brain and boosts our well-being.\\n- Practical ways to cultivate gratitude on a daily basis.\\n- The ripple effect of gratitude: how expressing gratitude can positively impact our relationships and overall happiness.\\n\\nChapter 5: Cultivating a Positive Mindset in Challenging Times (1500 words)\\n- Strategies for maintaining a positive mindset during times of adversity.\\n- The importance of resilience and bouncing back from setbacks.\\n- Techniques for shifting our focus from problems to solutions.\\n- Finding silver linings and opportunities for growth in difficult situations.\\n\\nChapter 6: Surrounding Yourself with Positive Influences (1000 words)\\n- The impact of our environment and the people we surround ourselves with on our mindset.\\n- Identifying toxic relationships and creating boundaries.\\n- Building a supportive network of like-minded individuals.\\n- The power of role models and mentors in shaping our positive mindset.\\n\\nChapter 7: Nurturing Self-Care and Emotional Well-being (1500 words)\\n- The connection between self-care, emotional well-being, and positive thinking.\\n- Practical self-care practices to enhance our mental and emotional health.\\n- The importance of self-compassion and forgiveness in maintaining a positive mindset.\\n- Managing stress and anxiety through self-care rituals.\\n\\nChapter 8: Harnessing the Power of Visualization (1200 words)\\n- Understanding the concept of visualization and its role in manifesting our desires.\\n- Techniques for effective visualization exercises.\\n- Creating a vision board to amplify the power of visualization.\\n- The link between visualization, motivation, and goal achievement.\\n\\nChapter 9: Embracing Failure as a Stepping Stone to Success (1000 words)\\n- Changing our perspective on failure and embracing it as a valuable learning opportunity.\\n- Overcoming fear of failure and taking calculated risks.\\n- The role of resilience in bouncing back from failures.\\n- Inspiring stories of successful individuals who turned failures into triumphs.\\n\\nChapter 10: Spreading Positivity: Making a Difference in the World (1000 words)\\n- The ripple effect of our positive mindset on the world around us.\\n- The power of kindness, compassion, and empathy in creating a positive impact.\\n- Ways to spread positivity in our communities and make a difference.\\n- Inspiring examples of individuals who have made significant positive change.\\n\\nConclusion (500 words):\\nAs we reach the end of this blog, it is essential to remember that cultivating a positive mindset is a lifelong journey. It requires consistent effort, self-reflection, and a commitment to growth. By embracing the power of positive thinking, we can transform our lives, create meaningful connections, and contribute to a more harmonious world. So, let us take a deep breath, embrace the present moment, and embark on this beautiful journey towards a life filled with positivity and fulfillment. Stop."}], "model": "gpt-3.5-turbo", "temperature": 0.5}' message='Post details'
https://api.openai.com:443 "POST /v1/chat/completions HTTP/1.1" 200 None
message='OpenAI API response' path=https://api.openai.com/v1/chat/completions processing_ms=810 request_id=d019dd7df7fc6de9e1b23187c88eb13e response_code=200
message='Request to OpenAI API' method=post path=https://api.openai.com/v1/chat/completions
api_version=None data='{"messages": [{"role": "user", "content": "Take a moment to reflect on your thoughts and start transforming your life one thought at a time. The power of positive thinking is within your reach."}], "model": "gpt-3.5-turbo", "temperature": 0.5}' message='Post details'
https://api.openai.com:443 "POST /v1/chat/completions HTTP/1.1" 200 None
message='OpenAI API response' path=https://api.openai.com/v1/chat/completions processing_ms=5871 request_id=ef1d28a2b6e3b7f043fbbc7c5694db2d response_code=200
message='Request to OpenAI API' method=post path=https://api.openai.com/v1/chat/completions
api_version=None data='{"messages": [{"role": "user", "content": "When we pause to reflect on our thoughts, we gain the opportunity to assess their impact on our lives. It is easy to get caught up in negative thinking patterns, allowing them to shape our actions and outcomes. However, by consciously choosing positive thoughts, we can begin to transform our lives.\\n\\nPositive thinking is not about denying reality or ignoring challenges; it is about approaching them with a mindset that focuses on possibilities, solutions, and growth. It is about acknowledging the difficulties but believing in our ability to overcome them.\\n\\nBy embracing positive thinking, we can rewire our brains to seek out the good in every situation. We can cultivate gratitude, resilience, and optimism. This shift in mindset empowers us to face challenges with confidence, find opportunities in setbacks, and maintain a sense of hope and joy even in difficult times.\\n\\nThe power of positive thinking lies within our reach, but it requires consistent effort and self-awareness. We must actively challenge negative thoughts and replace them with positive ones. We can practice affirmations, surround ourselves with uplifting people and environments, and engage in activities that bring us joy and fulfillment.\\n\\nTransforming our lives one thought at a time is a gradual process, but the cumulative effect can be profound. As we choose positive thoughts, we attract positive experiences and relationships. We become more resilient, adaptable, and open to growth. We inspire and uplift others, creating a ripple effect of positivity in our communities.\\n\\nSo, let us take a moment to reflect on our thoughts and commit to embracing the power of positive thinking. Let us be mindful of our inner dialogue, challenging negative beliefs, and replacing them with empowering thoughts. By doing so, we can create a life filled with happiness, success, and fulfillment."}], "model": "gpt-3.5-turbo", "temperature": 0.5}' message='Post details'
https://api.openai.com:443 "POST /v1/chat/completions HTTP/1.1" 200 None
message='OpenAI API response' path=https://api.openai.com/v1/chat/completions processing_ms=4178 request_id=2ab14a8da2767c28bdd983ad66412844 response_code=200
message='Request to OpenAI API' method=post path=https://api.openai.com/v1/chat/completions
api_version=None data='{"messages": [{"role": "user", "content": "Positive thinking has the potential to transform our lives. It is not about denying reality or ignoring challenges, but rather about approaching them with a mindset that focuses on possibilities, solutions, and growth. By consciously choosing positive thoughts, we can rewire our brains to seek out the good in every situation.\\n\\nEmbracing positive thinking requires consistent effort and self-awareness. We must actively challenge negative thoughts and replace them with positive ones. This can be done through affirmations, surrounding ourselves with uplifting people and environments, and engaging in activities that bring us joy and fulfillment.\\n\\nTransforming our lives through positive thinking is a gradual process, but the cumulative effect can be profound. As we choose positive thoughts, we attract positive experiences and relationships. We become more resilient, adaptable, and open to growth. We also inspire and uplift others, creating a ripple effect of positivity in our communities.\\n\\nLet us take a moment to reflect on our thoughts and commit to embracing the power of positive thinking. By being mindful of our inner dialogue, challenging negative beliefs, and replacing them with empowering thoughts, we can create a life filled with happiness, success, and fulfillment."}], "model": "gpt-3.5-turbo", "temperature": 0.5}' message='Post details'
https://api.openai.com:443 "POST /v1/chat/completions HTTP/1.1" 200 None
message='OpenAI API response' path=https://api.openai.com/v1/chat/completions processing_ms=4757 request_id=b5ce2a7c927910ace331c15b091eb943 response_code=200

File diff suppressed because one or more lines are too long

@ -109,6 +109,7 @@ nav:
- swarms.structs: - swarms.structs:
- Overview: "swarms/structs/overview.md" - Overview: "swarms/structs/overview.md"
- Workflow: "swarms/structs/workflow.md" - Workflow: "swarms/structs/workflow.md"
- Flow: "swarms/structs/flow.md"
- swarms.memory: - swarms.memory:
- PineconeVectorStoreStore: "swarms/memory/pinecone.md" - PineconeVectorStoreStore: "swarms/memory/pinecone.md"
- PGVectorStore: "swarms/memory/pg.md" - PGVectorStore: "swarms/memory/pg.md"

@ -15,6 +15,3 @@ from swarms.models import * # import * only works when __all__ = [] is defined
from swarms.structs import * from swarms.structs import *
from swarms.swarms import * from swarms.swarms import *
from swarms.agents import * from swarms.agents import *
from swarms.logo import print_colored_logo
print_colored_logo()

@ -1,55 +0,0 @@
from rich import print as rich_print
from rich.markdown import Markdown
from rich.rule import Rule
from termcolor import colored, cprint
def display_markdown_message(message):
"""
Display markdown message. Works with multiline strings with lots of indentation.
Will automatically make single line > tags beautiful.
"""
for line in message.split("\n"):
line = line.strip()
if line == "":
print("")
elif line == "---":
rich_print(Rule(style="white"))
else:
rich_print(Markdown(line))
if "\n" not in message and message.startswith(">"):
# Aesthetic choice. For these tags, they need a space below them
print("")
logo = """
________ _ _______ _______ _____ ______
/ ___/\ \/ \/ /\__ \\_ __ \/ \ / ___/
\___ \ \ / / __ \| | \/ Y Y \\___ \
/____ > \/\_/ (____ /__| |__|_| /____ >
\/ \/ \/ \/
"""
logo2 = """
_________ __ __ _____ __________ _____ _________
/ _____// \ / \ / _ \ \______ \ / \ / _____/
\_____ \ \ \/\/ // /_\ \ | _/ / \ / \ \_____ \
/ \ \ // | \| | \/ Y \ / \
/_______ / \__/\ / \____|__ /|____|_ /\____|__ //_______ /
\/ \/ \/ \/ \/ \/
"""
def print_colored_logo():
with open("swarms/logo.txt", "r") as file:
logo = file.read()
text = colored(logo, "red")
print(text)
# # Call the function
# print_colored_logo()

@ -1,7 +0,0 @@
_________ __ __ _____ __________ _____ _________
/ _____// \ / \ / _ \ \______ \ / \ / _____/
\_____ \ \ \/\/ // /_\ \ | _/ / \ / \ \_____ \
/ \ \ // | \| | \/ Y \ / \
/_______ / \__/\ / \____|__ /|____|_ /\____|__ //_______ /
\/ \/ \/ \/ \/ \/

@ -6,7 +6,7 @@ from swarms.models.openai_models import OpenAI, AzureOpenAI, OpenAIChat
from swarms.models.zephyr import Zephyr from swarms.models.zephyr import Zephyr
from swarms.models.biogpt import BioGPT from swarms.models.biogpt import BioGPT
from swarms.models.huggingface import HuggingfaceLLM from swarms.models.huggingface import HuggingfaceLLM
from swarms.models.wizard_storyteller import WizardLLMStoryTeller from swarms.models.wizard_storytelling import WizardLLMStoryTeller
from swarms.models.mpt import MPT7B from swarms.models.mpt import MPT7B
@ -21,7 +21,7 @@ from swarms.models.layoutlm_document_qa import LayoutLMDocumentQA
# from swarms.models.fuyu import Fuyu # Not working, wait until they update # from swarms.models.fuyu import Fuyu # Not working, wait until they update
import sys import sys
log_file = open("stderr_log.txt", "w") log_file = open("errors.txt", "w")
sys.stderr = log_file sys.stderr = log_file

File diff suppressed because it is too large Load Diff

@ -44,10 +44,8 @@ flow.save("path/flow.yaml")
import json import json
import logging import logging
import time import time
from typing import Any, Callable, Dict, List, Optional, Union from typing import Any, Callable, Dict, List, Optional
from pathlib import Path from termcolor import colored
import yaml
# Custome stopping condition # Custome stopping condition
@ -61,11 +59,12 @@ class Flow:
self, self,
llm: Any, llm: Any,
# template: str, # template: str,
max_loops: int = 1, max_loops: int = 5,
stopping_condition: Optional[Callable[[str], bool]] = None, stopping_condition: Optional[Callable[[str], bool]] = None,
loop_interval: int = 1, loop_interval: int = 1,
retry_attempts: int = 3, retry_attempts: int = 3,
retry_interval: int = 1, retry_interval: int = 1,
interactive: bool = False,
**kwargs: Any, **kwargs: Any,
): ):
self.llm = llm self.llm = llm
@ -77,6 +76,9 @@ class Flow:
self.retry_interval = retry_interval self.retry_interval = retry_interval
self.feedback = [] self.feedback = []
self.memory = [] self.memory = []
self.task = None
self.stopping_token = "<DONE>"
self.interactive = interactive
def provide_feedback(self, feedback: str) -> None: def provide_feedback(self, feedback: str) -> None:
"""Allow users to provide feedback on the responses.""" """Allow users to provide feedback on the responses."""
@ -98,28 +100,43 @@ class Flow:
"""Format the template with the provided kwargs using f-string interpolation.""" """Format the template with the provided kwargs using f-string interpolation."""
return template.format(**kwargs) return template.format(**kwargs)
def run(self, task: str): # formatted_prompts: str) -> str: def run(self, task: str):
""" """
Generate a result using the lm with optional query loops and stopping conditions. Run the autonomous agent loop
Args:
task (str): The initial task to run
Flow:
1. Generate a response
2. Check stopping condition
3. If stopping condition is met, stop
4. If stopping condition is not met, generate a response
5. Repeat until stopping condition is met or max_loops is reached
""" """
response = task response = task
history = [task] history = [task]
for _ in range(self.max_loops): for i in range(self.max_loops):
print(colored(f"\nLoop {i+1} of {self.max_loops}", 'blue'))
print("\n")
if self._check_stopping_condition(response): if self._check_stopping_condition(response):
break break
attempt = 0 attempt = 0
while attempt < self.retry_attempts: while attempt < self.retry_attempts:
try: try:
response = self.llm(response) response = self.llm(response)
print(f"Next query: {response}")
break break
except Exception as e: except Exception as e:
logging.error(f"Error generating response: {e}") logging.error(f"Error generating response: {e}")
attempt += 1 attempt += 1
time.sleep(self.retry_interval) time.sleep(self.retry_interval)
logging.info(f"Generated response: {response}")
history.append(response) history.append(response)
time.sleep(self.loop_interval) time.sleep(self.loop_interval)
return response, history self.memory.append(history)
return response #, history
def _run(self, **kwargs: Any) -> str: def _run(self, **kwargs: Any) -> str:
"""Generate a result using the provided keyword args.""" """Generate a result using the provided keyword args."""
@ -145,46 +162,11 @@ class Flow:
return Flow(llm=llm, template=template) return Flow(llm=llm, template=template)
def save(self, file_path) -> None: def save(self, file_path) -> None:
"""Save the flow. with open(file_path, 'w') as f:
json.dump(self.memory, f)
Expects `Flow._flow_type` property to be implemented and for memory to be print(f"Saved flow history to {file_path}")
null.
def load(self, file_path) -> None:
Args: with open(file_path, 'r') as f:
file_path: Path to file to save the flow to. self.memory = json.load(f)
print(f"Loaded flow history from {file_path}")
Example:
.. code-block:: python
flow.save(file_path="path/flow.yaml")
TODO: Save memory list and not dict.
"""
if self.memory is not None:
raise ValueError("Saving of memory is not yet supported.")
# Fetch dictionary to save
flow_dict = self.dict()
if "_type" not in flow_dict:
raise NotImplementedError(f"Flow {self} does not support saving.")
# Convert file to Path object.
if isinstance(file_path, str):
save_path = Path(file_path)
else:
save_path = file_path
directory_path = save_path.parent
directory_path.mkdir(parents=True, exist_ok=True)
if save_path.suffix == ".json":
with open(file_path, "w") as f:
json.dump(flow_dict, f, indent=4)
print(f"Saved Flow to JSON file: {file_path}")
elif save_path.suffix == ".yaml":
with open(file_path, "w") as f:
yaml.dump(flow_dict, f, default_flow_style=False)
print(f"Saved flow history to {file_path} as YAML")
else:
raise ValueError(f"{save_path} must be json or yaml")

@ -1,7 +1,9 @@
from __future__ import annotations from __future__ import annotations
import uuid
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from swarms.structs.task import Task
class Workflow: class Workflow:
@ -11,49 +13,27 @@ class Workflow:
They string together multiple tasks of varying types, and can use Short-Term Memory They string together multiple tasks of varying types, and can use Short-Term Memory
or pass specific arguments downstream. or pass specific arguments downstream.
Usage Usage
llm = LLM() llm = LLM()
workflow = Workflow(llm) workflow = Workflow(llm)
workflow.add("What's the weather in miami") workflow.add("What's the weather in miami")
workflow.add("Provide detauls for {{ parent_output }}") workflow.add("Provide details for {{ parent_output }}")
workflow.add("Summarize the above information: {{ parent_output}}) workflow.add("Summarize the above information: {{ parent_output}})
workflow.run() workflow.run()
""" """
class Task:
def __init__(self, task: str):
self.task = task
self.parents = []
self.children = []
self.output = None
self.structure = None
def add_child(self, child: "Workflow.Task"):
self.children.append(child)
child.parents.append(self)
child.structure = self.structure
def execute(self) -> Any:
prompt = self.task.replace(
"{{ parent_input }}", self.parents[0].output if self.parents else ""
)
response = self.structure.agent.run(prompt)
self.output = response
return response
def __init__(self, agent, parallel: bool = False): def __init__(self, agent, parallel: bool = False):
"""__init__""" """__init__"""
self.agent = agent self.agent = agent
self.tasks: List[Workflow.Task] = [] self.tasks: List[Task] = []
self.parallel = parallel self.parallel = parallel
def add(self, task: str) -> Task: def add(self, task: str) -> Task:
"""Add a task""" """Add a task"""
task = self.Task(task) task = Task(task_id=uuid.uuid4().hex, input=task)
if self.last_task(): if self.last_task():
self.last_task().add_child(task) self.last_task().add_child(task)
@ -70,9 +50,9 @@ class Workflow:
"""Last task""" """Last task"""
return self.tasks[-1] if self.tasks else None return self.tasks[-1] if self.tasks else None
def run(self, *args) -> Task: def run(self, task: str) -> Task:
"""Run tasks""" """Run tasks"""
[task.reset() for task in self.tasks] self.add(task)
if self.parallel: if self.parallel:
with ThreadPoolExecutor() as executor: with ThreadPoolExecutor() as executor:

@ -0,0 +1,219 @@
import pytest
from unittest.mock import patch
import os
from swarms.structs.flow import Flow, stop_when_repeats
from swarms.models import OpenAIChat
from dotenv import load_dotenv
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
# Mocks and Fixtures
@pytest.fixture
def mocked_llm():
return OpenAIChat(
openai_api_key=openai_api_key,
)
@pytest.fixture
def basic_flow(mocked_llm):
return Flow(llm=mocked_llm, max_loops=5)
@pytest.fixture
def flow_with_condition(mocked_llm):
return Flow(llm=mocked_llm, max_loops=5, stopping_condition=stop_when_repeats)
# Basic Tests
def test_stop_when_repeats():
assert stop_when_repeats("Please Stop now")
assert not stop_when_repeats("Continue the process")
def test_flow_initialization(basic_flow):
assert basic_flow.max_loops == 5
assert basic_flow.stopping_condition is None
assert basic_flow.loop_interval == 1
assert basic_flow.retry_attempts == 3
assert basic_flow.retry_interval == 1
assert basic_flow.feedback == []
assert basic_flow.memory == []
assert basic_flow.task is None
assert basic_flow.stopping_token == "<DONE>"
assert not basic_flow.interactive
def test_provide_feedback(basic_flow):
feedback = "Test feedback"
basic_flow.provide_feedback(feedback)
assert feedback in basic_flow.feedback
@patch('time.sleep', return_value=None) # to speed up tests
def test_run_without_stopping_condition(mocked_sleep, basic_flow):
response = basic_flow.run("Test task")
assert response == "Test task" # since our mocked llm doesn't modify the response
@patch('time.sleep', return_value=None) # to speed up tests
def test_run_with_stopping_condition(mocked_sleep, flow_with_condition):
response = flow_with_condition.run("Stop")
assert response == "Stop"
@patch('time.sleep', return_value=None) # to speed up tests
def test_run_with_exception(mocked_sleep, basic_flow):
basic_flow.llm.side_effect = Exception("Test Exception")
with pytest.raises(Exception, match="Test Exception"):
basic_flow.run("Test task")
def test_bulk_run(basic_flow):
inputs = [{"task": "Test1"}, {"task": "Test2"}]
responses = basic_flow.bulk_run(inputs)
assert responses == ["Test1", "Test2"]
# Tests involving file IO
def test_save_and_load(basic_flow, tmp_path):
file_path = tmp_path / "memory.json"
basic_flow.memory.append(["Test1", "Test2"])
basic_flow.save(file_path)
new_flow = Flow(llm=mocked_llm, max_loops=5)
new_flow.load(file_path)
assert new_flow.memory == [["Test1", "Test2"]]
# Environment variable mock test
def test_env_variable_handling(monkeypatch):
monkeypatch.setenv("API_KEY", "test_key")
assert os.getenv("API_KEY") == "test_key"
# TODO: Add more tests, especially edge cases and exception cases. Implement parametrized tests for varied inputs.
# Test initializing the flow with different stopping conditions
def test_flow_with_custom_stopping_condition(mocked_llm):
stopping_condition = lambda x: "terminate" in x.lower()
flow = Flow(llm=mocked_llm, max_loops=5, stopping_condition=stopping_condition)
assert flow.stopping_condition("Please terminate now")
assert not flow.stopping_condition("Continue the process")
# Test calling the flow directly
def test_flow_call(basic_flow):
response = basic_flow("Test call")
assert response == "Test call"
# Test formatting the prompt
def test_format_prompt(basic_flow):
formatted_prompt = basic_flow.format_prompt("Hello {name}", name="John")
assert formatted_prompt == "Hello John"
# Test with max loops
@patch('time.sleep', return_value=None)
def test_max_loops(mocked_sleep, basic_flow):
basic_flow.max_loops = 3
response = basic_flow.run("Looping")
assert response == "Looping"
# Test stopping token
@patch('time.sleep', return_value=None)
def test_stopping_token(mocked_sleep, basic_flow):
basic_flow.stopping_token = "Terminate"
response = basic_flow.run("Loop until Terminate")
assert response == "Loop until Terminate"
# Test interactive mode
def test_interactive_mode(basic_flow):
basic_flow.interactive = True
assert basic_flow.interactive
# Test bulk run with varied inputs
def test_bulk_run_varied_inputs(basic_flow):
inputs = [{"task": "Test1"}, {"task": "Test2"}, {"task": "Stop now"}]
responses = basic_flow.bulk_run(inputs)
assert responses == ["Test1", "Test2", "Stop now"]
# Test loading non-existent file
def test_load_non_existent_file(basic_flow, tmp_path):
file_path = tmp_path / "non_existent.json"
with pytest.raises(FileNotFoundError):
basic_flow.load(file_path)
# Test saving with different memory data
def test_save_different_memory(basic_flow, tmp_path):
file_path = tmp_path / "memory.json"
basic_flow.memory.append(["Task1", "Task2", "Task3"])
basic_flow.save(file_path)
with open(file_path, 'r') as f:
data = json.load(f)
assert data == [["Task1", "Task2", "Task3"]]
# Test the stopping condition check
def test_check_stopping_condition(flow_with_condition):
assert flow_with_condition._check_stopping_condition("Stop this process")
assert not flow_with_condition._check_stopping_condition("Continue the task")
# Test without providing max loops (default value should be 5)
def test_default_max_loops(mocked_llm):
flow = Flow(llm=mocked_llm)
assert flow.max_loops == 5
# Test creating flow from llm and template
def test_from_llm_and_template(mocked_llm):
flow = Flow.from_llm_and_template(mocked_llm, "Test template")
assert isinstance(flow, Flow)
# Mocking the OpenAIChat for testing
@patch('swarms.models.OpenAIChat', autospec=True)
def test_mocked_openai_chat(MockedOpenAIChat):
llm = MockedOpenAIChat(openai_api_key=openai_api_key)
llm.return_value = MagicMock()
flow = Flow(llm=llm, max_loops=5)
response = flow.run("Mocked run")
assert MockedOpenAIChat.called
# Test retry attempts
@patch('time.sleep', return_value=None)
def test_retry_attempts(mocked_sleep, basic_flow):
basic_flow.retry_attempts = 2
basic_flow.llm.side_effect = [Exception("Test Exception"), "Valid response"]
response = basic_flow.run("Test retry")
assert response == "Valid response"
# Test different loop intervals
@patch('time.sleep', return_value=None)
def test_different_loop_intervals(mocked_sleep, basic_flow):
basic_flow.loop_interval = 2
response = basic_flow.run("Test loop interval")
assert response == "Test loop interval"
# Test different retry intervals
@patch('time.sleep', return_value=None)
def test_different_retry_intervals(mocked_sleep, basic_flow):
basic_flow.retry_interval = 2
response = basic_flow.run("Test retry interval")
assert response == "Test retry interval"
# Test invoking the flow with additional kwargs
@patch('time.sleep', return_value=None)
def test_flow_call_with_kwargs(mocked_sleep, basic_flow):
response = basic_flow("Test call", param1="value1", param2="value2")
assert response == "Test call"
# Test initializing the flow with all parameters
def test_flow_initialization_all_params(mocked_llm):
flow = Flow(
llm=mocked_llm,
max_loops=10,
stopping_condition=stop_when_repeats,
loop_interval=2,
retry_attempts=4,
retry_interval=2,
interactive=True,
param1="value1",
param2="value2"
)
assert flow.max_loops == 10
assert flow.loop_interval == 2
assert flow.retry_attempts == 4
assert flow.retry_interval == 2
assert flow.interactive
# Test the stopping token is in the response
@patch('time.sleep', return_value=None)
def test_stopping_token_in_response(mocked_sleep, basic_flow):
response = basic_flow.run("Test stopping token")
assert basic_flow.stopping_token in response

@ -0,0 +1,13 @@
from swarms.models import OpenAIChat
from swarms.structs import Workflow
llm = OpenAIChat(
openai_api_key=""
)
workflow = Workflow(llm)
workflow.add("What's the weather in miami")
workflow.run()
Loading…
Cancel
Save