flow example, save and load state

Former-commit-id: 7d888c6a71
jojo-group-chat
Kye 1 year ago
parent 0598b64df6
commit 9437cbcdfe

@ -23,7 +23,12 @@ flow = Flow(
# dynamic_temperature=False, # Set to 'True' for dynamic temperature handling.
)
# out = flow.load_state("flow_state.json")
# temp = flow.dynamic_temperature()
# filter = flow.add_response_filter("Trump")
out = flow.run("Generate a 10,000 word blog on health and wellness.")
# out = flow.validate_response(out)
# out = flow.analyze_feedback(out)
# out = flow.print_history_and_memory()
# out = flow.save_state("flow_state.json")
print(out)

@ -0,0 +1,14 @@
{
"memory": [
[
"Human: Generate a 10,000 word blog on health and wellness."
]
],
"llm_params": {},
"loop_interval": 1,
"retry_attempts": 3,
"retry_interval": 1,
"interactive": false,
"dashboard": true,
"dynamic_temperature": false
}

@ -1,6 +1,7 @@
from swarms.agents.omni_modal_agent import OmniModalAgent
from swarms.agents.hf_agents import HFAgent
from swarms.agents.message import Message
# from swarms.agents.stream_response import stream
from swarms.agents.base import AbstractAgent
from swarms.agents.registry import Registry

@ -2,18 +2,16 @@
TODO:
- Add tools
- Add open interpreter style conversation
- Add configurable save and restore so the user can restore from previus flows
- Add memory vector database retrieval
"""
import json
import logging
import time
from typing import Any, Callable, Dict, List, Optional, Tuple, Generator
from typing import Any, Callable, Dict, List, Optional, Tuple
from termcolor import colored
import inspect
import random
# from swarms.tools.tool import BaseTool
# Constants
@ -36,7 +34,6 @@ When you have finished the task, and you feel as if you are done: output a speci
This will enable you to leave the flow loop.
"""
# Custome stopping condition
def stop_when_repeats(response: str) -> bool:
# Stop if the word stop appears in the response
@ -209,7 +206,7 @@ class Flow:
print(dashboard)
def run(self, task: str, **kwargs):
def run(self, task: str, save: bool = True, **kwargs):
"""
Run the autonomous agent loop
@ -223,7 +220,16 @@ class Flow:
4. If stopping condition is not met, generate a response
5. Repeat until stopping condition is met or max_loops is reached
Example:
>>> out = flow.run("Generate a 10,000 word blog on health and wellness.")
"""
# Start with a new history or continue from the last saved state
if not self.memory or not self.memory[-1]:
history = [f"Human: {task}"]
else:
history = self.memory[-1]
response = task
history = [f"Human: {task}"]
@ -231,9 +237,12 @@ class Flow:
if self.dashboard:
self.print_dashboard(task)
for i in range(self.max_loops):
# Start or continue the loop process
for i in range(len(history), self.max_loops):
print(colored(f"\nLoop {i+1} of {self.max_loops}", "blue"))
print("\n")
response = history[-1].split(": ", 1)[-1] # Get the last response
if self._check_stopping_condition(response) or parse_done_token(response):
break
@ -245,15 +254,8 @@ class Flow:
while attempt < self.retry_attempts:
try:
response = self.llm(
f"""
SYSTEM_PROMPT:
{FLOW_SYSTEM_PROMPT}
History: {response}
""",
**kwargs,
self.agent_history_prompt(FLOW_SYSTEM_PROMPT, response)
** kwargs,
)
# print(f"Next query: {response}")
# break
@ -274,6 +276,10 @@ class Flow:
history.append(response)
time.sleep(self.loop_interval)
self.memory.append(history)
if save:
self.save("flow_history.json")
return response # , history
def _run(self, **kwargs: Any) -> str:
@ -283,32 +289,31 @@ class Flow:
logging.info(f"Message history: {history}")
return response
def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]:
"""Generate responses for multiple input sets."""
return [self.run(**input_data) for input_data in inputs]
def run_dynamically(self, task: str, max_loops: Optional[int] = None):
def agent_history_prompt(
self,
system_prompt: str = FLOW_SYSTEM_PROMPT,
history=None,
):
"""
Run the autonomous agent loop dynamically based on the <DONE>
# Usage Example
# Initialize the Flow
flow = Flow(llm=lambda x: x, max_loops=5)
Generate the agent history prompt
# Run dynamically based on <DONE> token and optional max loops
response = flow.run_dynamically("Generate a report <DONE>", max_loops=3)
print(response)
Args:
system_prompt (str): The system prompt
history (List[str]): The history of the conversation
response = flow.run_dynamically("Generate a report <DONE>")
print(response)
Returns:
str: The agent history prompt
"""
agent_history_prompt = f"""
SYSTEM_PROMPT: {system_prompt}
History: {history}
"""
if "<DONE>" in task:
self.stopping_condition = parse_done_token
self.max_loops = max_loops or float("inf")
response = self.run(task)
return response
return agent_history_prompt
def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]:
"""Generate responses for multiple input sets."""
return [self.run(**input_data) for input_data in inputs]
@staticmethod
def from_llm_and_template(llm: Any, template: str) -> "Flow":
@ -339,6 +344,60 @@ class Flow:
return False
return True
def print_history_and_memory(self):
"""
Prints the entire history and memory of the flow.
Each message is colored and formatted for better readability.
"""
print(colored("Flow History and Memory", "cyan", attrs=["bold"]))
print(colored("========================", "cyan", attrs=["bold"]))
for loop_index, history in enumerate(self.memory, start=1):
print(colored(f"\nLoop {loop_index}:", "yellow", attrs=["bold"]))
for message in history:
speaker, _, message_text = message.partition(": ")
if "Human" in speaker:
print(colored(f"{speaker}:", "green") + f" {message_text}")
else:
print(colored(f"{speaker}:", "blue") + f" {message_text}")
print(colored("------------------------", "cyan"))
print(colored("End of Flow History", "cyan", attrs=["bold"]))
def step(self, task: str, **kwargs):
"""
Executes a single step in the flow interaction, generating a response
from the language model based on the given input text.
Args:
input_text (str): The input text to prompt the language model with.
Returns:
str: The language model's generated response.
Raises:
Exception: If an error occurs during response generation.
"""
try:
# Generate the response using lm
response = self.llm(task, **kwargs)
# Update the flow's history with the new interaction
if self.interactive:
self.memory.append(f"AI: {response}")
self.memory.append(f"Human: {task}")
else:
self.memory.append(f"AI: {response}")
return response
except Exception as error:
logging.error(f"Error generating response: {error}")
raise
def graceful_shutdown(self):
"""Gracefully shutdown the system saving the state"""
return self.save_state("flow_state.json")
def run_with_timeout(self, task: str, timeout: int = 60) -> str:
"""Run the loop but stop if it takes longer than the timeout"""
start_time = time.time()
@ -455,23 +514,97 @@ class Flow:
print()
return response
def streamed_token_generation(self, prompt: str) -> Generator[str, None, None]:
def get_llm_params(self):
"""
Extracts and returns the parameters of the llm object for serialization.
It assumes that the llm object has an __init__ method with parameters that can be used to recreate it.
"""
Generate tokens in real-time for a given prompt.
if not hasattr(self.llm, "__init__"):
return None
init_signature = inspect.signature(self.llm.__init__)
params = init_signature.parameters
llm_params = {}
This method simulates the real-time generation of each token.
For simplicity, we treat each character of the input as a token
and yield them with a slight delay. In a real-world scenario,
this would involve using the LLM's internal methods to generate
the response token by token.
for name, param in params.items():
if name == "self":
continue
if hasattr(self.llm, name):
value = getattr(self.llm, name)
if isinstance(
value, (str, int, float, bool, list, dict, tuple, type(None))
):
llm_params[name] = value
else:
llm_params[name] = str(
value
) # For non-serializable objects, save their string representation.
return llm_params
def save_state(self, file_path: str) -> None:
"""
Saves the current state of the flow to a JSON file, including the llm parameters.
Args:
prompt (str): The input prompt for which the tokens should be generated.
file_path (str): The path to the JSON file where the state will be saved.
Yields:
str: The next token (character) from the generated response.
Example:
>>> flow.save_state('saved_flow.json')
"""
tokens = list(prompt)
for token in tokens:
time.sleep(0.1)
yield token
state = {
"memory": self.memory,
# "llm_params": self.get_llm_params(),
"loop_interval": self.loop_interval,
"retry_attempts": self.retry_attempts,
"retry_interval": self.retry_interval,
"interactive": self.interactive,
"dashboard": self.dashboard,
"dynamic_temperature": self.dynamic_temperature,
}
with open(file_path, "w") as f:
json.dump(state, f, indent=4)
saved = colored("Saved flow state to", "green")
print(f"{saved} {file_path}")
def load_state(self, file_path: str):
"""
Loads the state of the flow from a json file and restores the configuration and memory.
Example:
>>> flow = Flow(llm=llm_instance, max_loops=5)
>>> flow.load_state('saved_flow.json')
>>> flow.run("Continue with the task")
"""
with open(file_path, "r") as f:
state = json.load(f)
# Assuming 'llm_class' is a class reference to the language
# llm_params = state.get("llm_params", {})
# self.llm = self.llm(**llm_params)
# Restore other saved attributes
self.memory = state.get("memory", [])
self.max_loops = state.get("max_loops", 5)
self.loop_interval = state.get("loop_interval", 1)
self.retry_attempts = state.get("retry_attempts", 3)
self.retry_interval = state.get("retry_interval", 1)
self.interactive = state.get("interactive", False)
print(f"Flow state loaded from {file_path}")
def retry_on_failure(self, function, retries: int = 3, retry_delay: int = 1):
"""Retry wrapper for LLM calls."""
attempt = 0
while attempt < retries:
try:
return function()
except Exception as error:
logging.error(f"Error generating response: {error}")
attempt += 1
time.sleep(retry_delay)
raise Exception("All retry attempts failed")

@ -18,3 +18,82 @@ workflow.add("Create a report on these metrics", mistral)
workflow.run()
"""
from dataclasses import dataclass, field
from typing import List, Any, Dict, Callable, Union
from swarms.models import OpenAIChat
from swarms.structs import Flow
# Define a generic Task that can handle different types of callable objects
@dataclass
class Task:
description: str
model: Union[Callable, Flow]
args: List[Any] = field(default_factory=list)
kwargs: Dict[str, Any] = field(default_factory=dict)
result: Any = None
def execute(self):
if isinstance(self.model, Flow):
self.result = self.model.run(*self.args, **self.kwargs)
else:
self.result = self.model(*self.args, **self.kwargs)
# SequentialWorkflow class definition using dataclasses
@dataclass
class SequentialWorkflow:
tasks: List[Task] = field(default_factory=list)
max_loops: int = 1
def add(
self, description: str, model: Union[Callable, Flow], *args, **kwargs
) -> None:
self.tasks.append(
Task(description=description, model=model, args=list(args), kwargs=kwargs)
)
def run(self) -> None:
for _ in range(self.max_loops):
for task in self.tasks:
# Check if the current task can be executed
if task.result is None:
task.execute()
# Pass the result as an argument to the next task if it exists
next_task_index = self.tasks.index(task) + 1
if next_task_index < len(self.tasks):
next_task = self.tasks[next_task_index]
next_task.args.insert(0, task.result)
# Example usage
api_key = "" # Your actual API key here
# Initialize the language model
llm = OpenAIChat(
openai_api_key=api_key,
temperature=0.5,
max_tokens=3000,
)
# Initialize the Flow with the language model
flow1 = Flow(llm=llm, max_loops=5, dashboard=True)
# Create another Flow for a different task
flow2 = Flow(llm=llm, max_loops=5, dashboard=True)
# Create the workflow
workflow = SequentialWorkflow(max_loops=1)
# Add tasks to the workflow
workflow.add("Generate a 10,000 word blog on health and wellness.", flow1)
# Suppose the next task takes the output of the first task as input
workflow.add("Summarize the generated blog", flow2)
# Run the workflow
workflow.run()
# Output the results
for task in workflow.tasks:
print(f"Task: {task.description}, Result: {task.result}")

@ -3,12 +3,15 @@
import pytest
import openai
from unittest.mock import patch
from swarms.models.simple_ada import get_ada_embeddings # Adjust this import path to your project structure
from swarms.models.simple_ada import (
get_ada_embeddings,
) # Adjust this import path to your project structure
from os import getenv
from dotenv import load_dotenv
load_dotenv()
# Fixture for test texts
@pytest.fixture
def test_texts():
@ -18,20 +21,24 @@ def test_texts():
"A quick brown fox jumps over the lazy dog",
]
# Basic Test
def test_get_ada_embeddings_basic(test_texts):
with patch('openai.Embedding.create') as mock_create:
with patch("openai.Embedding.create") as mock_create:
# Mocking the OpenAI API call
mock_create.return_value = {
"data": [
{"embedding": [0.1, 0.2, 0.3]}
]
}
mock_create.return_value = {"data": [{"embedding": [0.1, 0.2, 0.3]}]}
for text in test_texts:
embedding = get_ada_embeddings(text)
assert embedding == [0.1, 0.2, 0.3], "Embedding does not match expected output"
mock_create.assert_called_with(input=[text.replace("\n", " ")], model="text-embedding-ada-002")
assert embedding == [
0.1,
0.2,
0.3,
], "Embedding does not match expected output"
mock_create.assert_called_with(
input=[text.replace("\n", " ")], model="text-embedding-ada-002"
)
# Parameterized Test
@pytest.mark.parametrize(
@ -42,27 +49,28 @@ def test_get_ada_embeddings_basic(test_texts):
],
)
def test_get_ada_embeddings_models(text, model, expected_call_model):
with patch('openai.Embedding.create') as mock_create:
mock_create.return_value = {
"data": [
{"embedding": [0.1, 0.2, 0.3]}
]
}
with patch("openai.Embedding.create") as mock_create:
mock_create.return_value = {"data": [{"embedding": [0.1, 0.2, 0.3]}]}
_ = get_ada_embeddings(text, model=model)
mock_create.assert_called_with(input=[text], model=expected_call_model)
# Exception Test
def test_get_ada_embeddings_exception():
with patch('openai.Embedding.create') as mock_create:
with patch("openai.Embedding.create") as mock_create:
mock_create.side_effect = openai.error.OpenAIError("Test error")
with pytest.raises(openai.error.OpenAIError):
get_ada_embeddings("Some text")
# Tests for environment variable loading
def test_env_var_loading(monkeypatch):
monkeypatch.setenv("OPENAI_API_KEY", "testkey123")
with patch('openai.Embedding.create'):
assert getenv("OPENAI_API_KEY") == "testkey123", "Environment variable for API key is not set correctly"
with patch("openai.Embedding.create"):
assert (
getenv("OPENAI_API_KEY") == "testkey123"
), "Environment variable for API key is not set correctly"
# ... more tests to cover other aspects such as different input types, large inputs, invalid inputs, etc.

@ -70,11 +70,14 @@ def test_llm_memory_consumption(llm_instance):
# Test different initialization parameters
@pytest.mark.parametrize("model_id, max_length", [
@pytest.mark.parametrize(
"model_id, max_length",
[
("gpt2-small", 100),
("gpt2-medium", 200),
("gpt2-large", None) # None to check default behavior
])
("gpt2-large", None), # None to check default behavior
],
)
def test_llm_initialization_params(model_id, max_length):
if max_length:
instance = HuggingfaceLLM(model_id=model_id, max_length=max_length)
@ -157,11 +160,14 @@ def test_llm_timeout_handling(mock_run, llm_instance):
@patch("swarms.models.huggingface.HuggingfaceLLM.run")
def test_llm_response_time(mock_run, llm_instance):
import time
mock_run.return_value = "mocked output"
start_time = time.time()
llm_instance.run("test task for response time")
end_time = time.time()
assert end_time - start_time < 1 # Assuming the response should be faster than 1 second
assert (
end_time - start_time < 1
) # Assuming the response should be faster than 1 second
# Test the logging of a warning for long inputs
@ -173,7 +179,9 @@ def test_llm_long_input_warning(mock_warning, llm_instance):
# Test for run method behavior when model raises an exception
@patch("swarms.models.huggingface.HuggingfaceLLM._model.generate", side_effect=RuntimeError)
@patch(
"swarms.models.huggingface.HuggingfaceLLM._model.generate", side_effect=RuntimeError
)
def test_llm_run_model_exception(mock_generate, llm_instance):
with pytest.raises(RuntimeError):
llm_instance.run("test task when model fails")
@ -219,6 +227,7 @@ def test_llm_multilingual_input(mock_run, llm_instance):
result = llm_instance.run(multilingual_input)
assert isinstance(result, str) # Simple check to ensure output is string type
# Test caching mechanism to prevent re-running the same inputs
@patch("swarms.models.huggingface.HuggingfaceLLM.run")
def test_llm_caching_mechanism(mock_run, llm_instance):

Loading…
Cancel
Save