From 7d888c6a71fe8e4600458cbc03d5e649705ea30d Mon Sep 17 00:00:00 2001 From: Kye Date: Fri, 3 Nov 2023 19:57:19 -0400 Subject: [PATCH] flow example, save and load state --- flow.py | 9 +- flow_state.json | 14 ++ swarms/agents/__init__.py | 1 + swarms/models/distilled_whisperx.py | 2 +- swarms/models/huggingface.py | 2 +- swarms/structs/flow.py | 233 ++++++++++++++++++++------ swarms/structs/sequential_workflow.py | 79 +++++++++ tests/models/ada.py | 46 ++--- tests/models/huggingface.py | 23 ++- 9 files changed, 329 insertions(+), 80 deletions(-) create mode 100644 flow_state.json diff --git a/flow.py b/flow.py index 1eb46ee6..ed402a92 100644 --- a/flow.py +++ b/flow.py @@ -23,7 +23,12 @@ flow = Flow( # dynamic_temperature=False, # Set to 'True' for dynamic temperature handling. ) - +# out = flow.load_state("flow_state.json") +# temp = flow.dynamic_temperature() +# filter = flow.add_response_filter("Trump") out = flow.run("Generate a 10,000 word blog on health and wellness.") - +# out = flow.validate_response(out) +# out = flow.analyze_feedback(out) +# out = flow.print_history_and_memory() +# out = flow.save_state("flow_state.json") print(out) diff --git a/flow_state.json b/flow_state.json new file mode 100644 index 00000000..8ed134a0 --- /dev/null +++ b/flow_state.json @@ -0,0 +1,14 @@ +{ + "memory": [ + [ + "Human: Generate a 10,000 word blog on health and wellness." + ] + ], + "llm_params": {}, + "loop_interval": 1, + "retry_attempts": 3, + "retry_interval": 1, + "interactive": false, + "dashboard": true, + "dynamic_temperature": false +} \ No newline at end of file diff --git a/swarms/agents/__init__.py b/swarms/agents/__init__.py index f622f3f8..597c8c76 100644 --- a/swarms/agents/__init__.py +++ b/swarms/agents/__init__.py @@ -1,6 +1,7 @@ from swarms.agents.omni_modal_agent import OmniModalAgent from swarms.agents.hf_agents import HFAgent from swarms.agents.message import Message + # from swarms.agents.stream_response import stream from swarms.agents.base import AbstractAgent from swarms.agents.registry import Registry diff --git a/swarms/models/distilled_whisperx.py b/swarms/models/distilled_whisperx.py index 2eb2788d..8062daa4 100644 --- a/swarms/models/distilled_whisperx.py +++ b/swarms/models/distilled_whisperx.py @@ -1,3 +1,3 @@ """ -""" \ No newline at end of file +""" diff --git a/swarms/models/huggingface.py b/swarms/models/huggingface.py index 437d9144..d18b1b9d 100644 --- a/swarms/models/huggingface.py +++ b/swarms/models/huggingface.py @@ -294,7 +294,7 @@ class HuggingfaceLLM: ) print(dashboard) - + def set_device(self, device): """ Changes the device used for inference. diff --git a/swarms/structs/flow.py b/swarms/structs/flow.py index 40e00ca1..1d46678c 100644 --- a/swarms/structs/flow.py +++ b/swarms/structs/flow.py @@ -2,18 +2,16 @@ TODO: - Add tools - Add open interpreter style conversation -- Add configurable save and restore so the user can restore from previus flows - Add memory vector database retrieval """ import json import logging import time -from typing import Any, Callable, Dict, List, Optional, Tuple, Generator +from typing import Any, Callable, Dict, List, Optional, Tuple from termcolor import colored import inspect import random -# from swarms.tools.tool import BaseTool # Constants @@ -36,7 +34,6 @@ When you have finished the task, and you feel as if you are done: output a speci This will enable you to leave the flow loop. """ - # Custome stopping condition def stop_when_repeats(response: str) -> bool: # Stop if the word stop appears in the response @@ -209,7 +206,7 @@ class Flow: print(dashboard) - def run(self, task: str, **kwargs): + def run(self, task: str, save: bool = True, **kwargs): """ Run the autonomous agent loop @@ -223,7 +220,16 @@ class Flow: 4. If stopping condition is not met, generate a response 5. Repeat until stopping condition is met or max_loops is reached + Example: + >>> out = flow.run("Generate a 10,000 word blog on health and wellness.") + """ + # Start with a new history or continue from the last saved state + if not self.memory or not self.memory[-1]: + history = [f"Human: {task}"] + else: + history = self.memory[-1] + response = task history = [f"Human: {task}"] @@ -231,9 +237,12 @@ class Flow: if self.dashboard: self.print_dashboard(task) - for i in range(self.max_loops): + # Start or continue the loop process + for i in range(len(history), self.max_loops): print(colored(f"\nLoop {i+1} of {self.max_loops}", "blue")) print("\n") + response = history[-1].split(": ", 1)[-1] # Get the last response + if self._check_stopping_condition(response) or parse_done_token(response): break @@ -245,15 +254,8 @@ class Flow: while attempt < self.retry_attempts: try: response = self.llm( - f""" - SYSTEM_PROMPT: - {FLOW_SYSTEM_PROMPT} - - - History: {response} - - """, - **kwargs, + self.agent_history_prompt(FLOW_SYSTEM_PROMPT, response) + ** kwargs, ) # print(f"Next query: {response}") # break @@ -274,6 +276,10 @@ class Flow: history.append(response) time.sleep(self.loop_interval) self.memory.append(history) + + if save: + self.save("flow_history.json") + return response # , history def _run(self, **kwargs: Any) -> str: @@ -283,32 +289,31 @@ class Flow: logging.info(f"Message history: {history}") return response - def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]: - """Generate responses for multiple input sets.""" - return [self.run(**input_data) for input_data in inputs] - - def run_dynamically(self, task: str, max_loops: Optional[int] = None): + def agent_history_prompt( + self, + system_prompt: str = FLOW_SYSTEM_PROMPT, + history=None, + ): """ - Run the autonomous agent loop dynamically based on the - - # Usage Example + Generate the agent history prompt - # Initialize the Flow - flow = Flow(llm=lambda x: x, max_loops=5) - - # Run dynamically based on token and optional max loops - response = flow.run_dynamically("Generate a report ", max_loops=3) - print(response) + Args: + system_prompt (str): The system prompt + history (List[str]): The history of the conversation - response = flow.run_dynamically("Generate a report ") - print(response) + Returns: + str: The agent history prompt + """ + agent_history_prompt = f""" + SYSTEM_PROMPT: {system_prompt} + History: {history} """ - if "" in task: - self.stopping_condition = parse_done_token - self.max_loops = max_loops or float("inf") - response = self.run(task) - return response + return agent_history_prompt + + def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]: + """Generate responses for multiple input sets.""" + return [self.run(**input_data) for input_data in inputs] @staticmethod def from_llm_and_template(llm: Any, template: str) -> "Flow": @@ -339,6 +344,60 @@ class Flow: return False return True + def print_history_and_memory(self): + """ + Prints the entire history and memory of the flow. + Each message is colored and formatted for better readability. + """ + print(colored("Flow History and Memory", "cyan", attrs=["bold"])) + print(colored("========================", "cyan", attrs=["bold"])) + for loop_index, history in enumerate(self.memory, start=1): + print(colored(f"\nLoop {loop_index}:", "yellow", attrs=["bold"])) + for message in history: + speaker, _, message_text = message.partition(": ") + if "Human" in speaker: + print(colored(f"{speaker}:", "green") + f" {message_text}") + else: + print(colored(f"{speaker}:", "blue") + f" {message_text}") + print(colored("------------------------", "cyan")) + print(colored("End of Flow History", "cyan", attrs=["bold"])) + + def step(self, task: str, **kwargs): + """ + + Executes a single step in the flow interaction, generating a response + from the language model based on the given input text. + + Args: + input_text (str): The input text to prompt the language model with. + + Returns: + str: The language model's generated response. + + Raises: + Exception: If an error occurs during response generation. + + """ + try: + # Generate the response using lm + response = self.llm(task, **kwargs) + + # Update the flow's history with the new interaction + if self.interactive: + self.memory.append(f"AI: {response}") + self.memory.append(f"Human: {task}") + else: + self.memory.append(f"AI: {response}") + + return response + except Exception as error: + logging.error(f"Error generating response: {error}") + raise + + def graceful_shutdown(self): + """Gracefully shutdown the system saving the state""" + return self.save_state("flow_state.json") + def run_with_timeout(self, task: str, timeout: int = 60) -> str: """Run the loop but stop if it takes longer than the timeout""" start_time = time.time() @@ -455,23 +514,97 @@ class Flow: print() return response - def streamed_token_generation(self, prompt: str) -> Generator[str, None, None]: + def get_llm_params(self): + """ + Extracts and returns the parameters of the llm object for serialization. + It assumes that the llm object has an __init__ method with parameters that can be used to recreate it. """ - Generate tokens in real-time for a given prompt. + if not hasattr(self.llm, "__init__"): + return None - This method simulates the real-time generation of each token. - For simplicity, we treat each character of the input as a token - and yield them with a slight delay. In a real-world scenario, - this would involve using the LLM's internal methods to generate - the response token by token. + init_signature = inspect.signature(self.llm.__init__) + params = init_signature.parameters + llm_params = {} + + for name, param in params.items(): + if name == "self": + continue + if hasattr(self.llm, name): + value = getattr(self.llm, name) + if isinstance( + value, (str, int, float, bool, list, dict, tuple, type(None)) + ): + llm_params[name] = value + else: + llm_params[name] = str( + value + ) # For non-serializable objects, save their string representation. + + return llm_params + + def save_state(self, file_path: str) -> None: + """ + Saves the current state of the flow to a JSON file, including the llm parameters. Args: - prompt (str): The input prompt for which the tokens should be generated. + file_path (str): The path to the JSON file where the state will be saved. - Yields: - str: The next token (character) from the generated response. + Example: + >>> flow.save_state('saved_flow.json') """ - tokens = list(prompt) - for token in tokens: - time.sleep(0.1) - yield token + state = { + "memory": self.memory, + # "llm_params": self.get_llm_params(), + "loop_interval": self.loop_interval, + "retry_attempts": self.retry_attempts, + "retry_interval": self.retry_interval, + "interactive": self.interactive, + "dashboard": self.dashboard, + "dynamic_temperature": self.dynamic_temperature, + } + + with open(file_path, "w") as f: + json.dump(state, f, indent=4) + + saved = colored("Saved flow state to", "green") + print(f"{saved} {file_path}") + + def load_state(self, file_path: str): + """ + Loads the state of the flow from a json file and restores the configuration and memory. + + + Example: + >>> flow = Flow(llm=llm_instance, max_loops=5) + >>> flow.load_state('saved_flow.json') + >>> flow.run("Continue with the task") + + """ + with open(file_path, "r") as f: + state = json.load(f) + + # Assuming 'llm_class' is a class reference to the language + # llm_params = state.get("llm_params", {}) + # self.llm = self.llm(**llm_params) + + # Restore other saved attributes + self.memory = state.get("memory", []) + self.max_loops = state.get("max_loops", 5) + self.loop_interval = state.get("loop_interval", 1) + self.retry_attempts = state.get("retry_attempts", 3) + self.retry_interval = state.get("retry_interval", 1) + self.interactive = state.get("interactive", False) + + print(f"Flow state loaded from {file_path}") + + def retry_on_failure(self, function, retries: int = 3, retry_delay: int = 1): + """Retry wrapper for LLM calls.""" + attempt = 0 + while attempt < retries: + try: + return function() + except Exception as error: + logging.error(f"Error generating response: {error}") + attempt += 1 + time.sleep(retry_delay) + raise Exception("All retry attempts failed") diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index 2df95c07..f27f3989 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -18,3 +18,82 @@ workflow.add("Create a report on these metrics", mistral) workflow.run() """ +from dataclasses import dataclass, field +from typing import List, Any, Dict, Callable, Union +from swarms.models import OpenAIChat +from swarms.structs import Flow + + +# Define a generic Task that can handle different types of callable objects +@dataclass +class Task: + description: str + model: Union[Callable, Flow] + args: List[Any] = field(default_factory=list) + kwargs: Dict[str, Any] = field(default_factory=dict) + result: Any = None + + def execute(self): + if isinstance(self.model, Flow): + self.result = self.model.run(*self.args, **self.kwargs) + else: + self.result = self.model(*self.args, **self.kwargs) + + +# SequentialWorkflow class definition using dataclasses +@dataclass +class SequentialWorkflow: + tasks: List[Task] = field(default_factory=list) + max_loops: int = 1 + + def add( + self, description: str, model: Union[Callable, Flow], *args, **kwargs + ) -> None: + self.tasks.append( + Task(description=description, model=model, args=list(args), kwargs=kwargs) + ) + + def run(self) -> None: + for _ in range(self.max_loops): + for task in self.tasks: + # Check if the current task can be executed + if task.result is None: + task.execute() + # Pass the result as an argument to the next task if it exists + next_task_index = self.tasks.index(task) + 1 + if next_task_index < len(self.tasks): + next_task = self.tasks[next_task_index] + next_task.args.insert(0, task.result) + + +# Example usage +api_key = "" # Your actual API key here + +# Initialize the language model +llm = OpenAIChat( + openai_api_key=api_key, + temperature=0.5, + max_tokens=3000, +) + +# Initialize the Flow with the language model +flow1 = Flow(llm=llm, max_loops=5, dashboard=True) + +# Create another Flow for a different task +flow2 = Flow(llm=llm, max_loops=5, dashboard=True) + +# Create the workflow +workflow = SequentialWorkflow(max_loops=1) + +# Add tasks to the workflow +workflow.add("Generate a 10,000 word blog on health and wellness.", flow1) + +# Suppose the next task takes the output of the first task as input +workflow.add("Summarize the generated blog", flow2) + +# Run the workflow +workflow.run() + +# Output the results +for task in workflow.tasks: + print(f"Task: {task.description}, Result: {task.result}") diff --git a/tests/models/ada.py b/tests/models/ada.py index 786b162d..08f1a687 100644 --- a/tests/models/ada.py +++ b/tests/models/ada.py @@ -3,12 +3,15 @@ import pytest import openai from unittest.mock import patch -from swarms.models.simple_ada import get_ada_embeddings # Adjust this import path to your project structure +from swarms.models.simple_ada import ( + get_ada_embeddings, +) # Adjust this import path to your project structure from os import getenv from dotenv import load_dotenv load_dotenv() + # Fixture for test texts @pytest.fixture def test_texts(): @@ -18,20 +21,24 @@ def test_texts(): "A quick brown fox jumps over the lazy dog", ] + # Basic Test def test_get_ada_embeddings_basic(test_texts): - with patch('openai.Embedding.create') as mock_create: + with patch("openai.Embedding.create") as mock_create: # Mocking the OpenAI API call - mock_create.return_value = { - "data": [ - {"embedding": [0.1, 0.2, 0.3]} - ] - } - + mock_create.return_value = {"data": [{"embedding": [0.1, 0.2, 0.3]}]} + for text in test_texts: embedding = get_ada_embeddings(text) - assert embedding == [0.1, 0.2, 0.3], "Embedding does not match expected output" - mock_create.assert_called_with(input=[text.replace("\n", " ")], model="text-embedding-ada-002") + assert embedding == [ + 0.1, + 0.2, + 0.3, + ], "Embedding does not match expected output" + mock_create.assert_called_with( + input=[text.replace("\n", " ")], model="text-embedding-ada-002" + ) + # Parameterized Test @pytest.mark.parametrize( @@ -42,27 +49,28 @@ def test_get_ada_embeddings_basic(test_texts): ], ) def test_get_ada_embeddings_models(text, model, expected_call_model): - with patch('openai.Embedding.create') as mock_create: - mock_create.return_value = { - "data": [ - {"embedding": [0.1, 0.2, 0.3]} - ] - } + with patch("openai.Embedding.create") as mock_create: + mock_create.return_value = {"data": [{"embedding": [0.1, 0.2, 0.3]}]} _ = get_ada_embeddings(text, model=model) mock_create.assert_called_with(input=[text], model=expected_call_model) + # Exception Test def test_get_ada_embeddings_exception(): - with patch('openai.Embedding.create') as mock_create: + with patch("openai.Embedding.create") as mock_create: mock_create.side_effect = openai.error.OpenAIError("Test error") with pytest.raises(openai.error.OpenAIError): get_ada_embeddings("Some text") + # Tests for environment variable loading def test_env_var_loading(monkeypatch): monkeypatch.setenv("OPENAI_API_KEY", "testkey123") - with patch('openai.Embedding.create'): - assert getenv("OPENAI_API_KEY") == "testkey123", "Environment variable for API key is not set correctly" + with patch("openai.Embedding.create"): + assert ( + getenv("OPENAI_API_KEY") == "testkey123" + ), "Environment variable for API key is not set correctly" + # ... more tests to cover other aspects such as different input types, large inputs, invalid inputs, etc. diff --git a/tests/models/huggingface.py b/tests/models/huggingface.py index 847ced06..71fefa67 100644 --- a/tests/models/huggingface.py +++ b/tests/models/huggingface.py @@ -70,11 +70,14 @@ def test_llm_memory_consumption(llm_instance): # Test different initialization parameters -@pytest.mark.parametrize("model_id, max_length", [ - ("gpt2-small", 100), - ("gpt2-medium", 200), - ("gpt2-large", None) # None to check default behavior -]) +@pytest.mark.parametrize( + "model_id, max_length", + [ + ("gpt2-small", 100), + ("gpt2-medium", 200), + ("gpt2-large", None), # None to check default behavior + ], +) def test_llm_initialization_params(model_id, max_length): if max_length: instance = HuggingfaceLLM(model_id=model_id, max_length=max_length) @@ -157,11 +160,14 @@ def test_llm_timeout_handling(mock_run, llm_instance): @patch("swarms.models.huggingface.HuggingfaceLLM.run") def test_llm_response_time(mock_run, llm_instance): import time + mock_run.return_value = "mocked output" start_time = time.time() llm_instance.run("test task for response time") end_time = time.time() - assert end_time - start_time < 1 # Assuming the response should be faster than 1 second + assert ( + end_time - start_time < 1 + ) # Assuming the response should be faster than 1 second # Test the logging of a warning for long inputs @@ -173,7 +179,9 @@ def test_llm_long_input_warning(mock_warning, llm_instance): # Test for run method behavior when model raises an exception -@patch("swarms.models.huggingface.HuggingfaceLLM._model.generate", side_effect=RuntimeError) +@patch( + "swarms.models.huggingface.HuggingfaceLLM._model.generate", side_effect=RuntimeError +) def test_llm_run_model_exception(mock_generate, llm_instance): with pytest.raises(RuntimeError): llm_instance.run("test task when model fails") @@ -219,6 +227,7 @@ def test_llm_multilingual_input(mock_run, llm_instance): result = llm_instance.run(multilingual_input) assert isinstance(result, str) # Simple check to ensure output is string type + # Test caching mechanism to prevent re-running the same inputs @patch("swarms.models.huggingface.HuggingfaceLLM.run") def test_llm_caching_mechanism(mock_run, llm_instance):