diff --git a/playground/posmed/positive_med.py b/playground/posmed/positive_med.py index 9e44bc69..ccb45083 100644 --- a/playground/posmed/positive_med.py +++ b/playground/posmed/positive_med.py @@ -1,7 +1,7 @@ from swarms import OpenAIChat DRAFT_PROMPT = """# MISSION -Write a 100% unique, creative and in human-like style article of a minimum of 2000 words using headings and sub-headings. +Write a 100% unique, creative and in human-like style article of a minimum of 5,000 words using headings and sub-headings. Ensure your tone is Professional and casual while focusing on presenting information and analysis without excessive embellishment. The topic is: {{TOPIC}} diff --git a/swarms/structs/flow.py b/swarms/structs/flow.py index 52b36edf..2c45df53 100644 --- a/swarms/structs/flow.py +++ b/swarms/structs/flow.py @@ -44,17 +44,60 @@ flow.save("path/flow.yaml") import json import logging import time -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional, Tuple from termcolor import colored - # Custome stopping condition def stop_when_repeats(response: str) -> bool: # Stop if the word stop appears in the response return "Stop" in response.lower() +def parse_done_token(response: str) -> bool: + """Parse the response to see if the done token is present""" + return "" in response + class Flow: + """ + Flow is a chain like structure from langchain that provides the autonomy to language models + to generate sequential responses. + + Features: + * User defined queries + * Dynamic keep generating until is outputted by the agent + * Interactive, AI generates, then user input + * Message history and performance history fed -> into context + * Ability to save and load flows + * Ability to provide feedback on responses + * Ability to provide a stopping condition + * Ability to provide a retry mechanism + * Ability to provide a loop interval + + Args: + llm (Any): The language model to use + max_loops (int): The maximum number of loops to run + stopping_condition (Optional[Callable[[str], bool]]): A stopping condition + loop_interval (int): The interval between loops + retry_attempts (int): The number of retry attempts + retry_interval (int): The interval between retry attempts + interactive (bool): Whether or not to run in interactive mode + **kwargs (Any): Any additional keyword arguments + + Example: + >>> from swarms.models import OpenAIChat + >>> from swarms.structs import Flow + >>> llm = OpenAIChat( + ... openai_api_key=api_key, + ... temperature=0.5, + ... ) + >>> flow = Flow( + ... llm=llm, max_loops=5, + ... #system_prompt=SYSTEM_PROMPT, + ... #retry_interval=1, + ... ) + >>> flow.run("Generate a 10,000 word blog") + >>> flow.save("path/flow.yaml") + """ def __init__( self, llm: Any, @@ -148,6 +191,33 @@ class Flow: def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]: """Generate responses for multiple input sets.""" return [self.run(**input_data) for input_data in inputs] + + def run_dynamically( + self, + task: str, + max_loops: Optional[int] = None + ): + """ + Run the autonomous agent loop dynamically based on the + + # Usage Example + + # Initialize the Flow + flow = Flow(llm=lambda x: x, max_loops=5) + + # Run dynamically based on token and optional max loops + response = flow.run_dynamically("Generate a report ", max_loops=3) + print(response) + + response = flow.run_dynamically("Generate a report ") + print(response) + + """ + if "" in task: + self.stopping_condition = parse_done_token + self.max_loops = max_loops or float('inf') + response = self.run(task) + return response @staticmethod def from_llm_and_template(llm: Any, template: str) -> "Flow": @@ -170,3 +240,139 @@ class Flow: with open(file_path, 'r') as f: self.memory = json.load(f) print(f"Loaded flow history from {file_path}") + + def validate_response(self, response: str) -> bool: + """Validate the response based on certain criteria""" + if len(response) < 5: + print("Response is too short") + return False + return True + + def run_with_timeout( + self, + task: str, + timeout: int = 60 + ) -> str: + """Run the loop but stop if it takes longer than the timeout""" + start_time = time.time() + response = self.run(task) + end_time = time.time() + if end_time - start_time > timeout: + print("Operaiton timed out") + return 'Timeout' + return response + + def backup_memory_to_s3( + self, + bucket_name: str, + object_name: str + ): + """Backup the memory to S3""" + import boto3 + s3 = boto3.client('s3') + s3.put_object(Bucket=bucket_name, Key=object_name, Body=json.dumps(self.memory)) + print(f"Backed up memory to S3: {bucket_name}/{object_name}") + + def analyze_feedback(self): + """Analyze the feedback for issues""" + feedback_counts = {} + for feedback in self.feedback: + if feedback in feedback_counts: + feedback_counts[feedback] += 1 + else: + feedback_counts[feedback] = 1 + print(f"Feedback counts: {feedback_counts}") + + def undo_last(self) -> Tuple[str, str]: + """ + Response the last response and return the previous state + + Example: + # Feature 2: Undo functionality + response = flow.run("Another task") + print(f"Response: {response}") + previous_state, message = flow.undo_last() + print(message) + + """ + if len(self.memory) < 2: + return None, None + + # Remove the last response + self.memory.pop() + + # Get the previous state + previous_state = self.memory[-1][-1] + return previous_state, f"Restored to {previous_state}" + + # Response Filtering + def add_response_filter(self, filter_word: str) -> None: + """ + Add a response filter to filter out certain words from the response + + Example: + flow.add_response_filter("Trump") + flow.run("Generate a report on Trump") + + + """ + self.reponse_filters.append(filter_word) + + def apply_reponse_filters(self, response: str) -> str: + """ + Apply the response filters to the response + + + """ + for word in self.response_filters: + response = response.replace(word, "[FILTERED]") + return response + + def filtered_run(self, task: str) -> str: + """ + # Feature 3: Response filtering + flow.add_response_filter("report") + response = flow.filtered_run("Generate a report on finance") + print(response) + """ + raw_response = self.run(task) + return self.apply_response_filters(raw_response) + + def interactive_run( + self, + max_loops: int = 5 + ) -> None: + """Interactive run mode""" + response = input("Start the cnversation") + + for i in range(max_loops): + ai_response = self.streamed_generation(response) + print(f"AI: {ai_response}") + + # Get user input + response = input("You: ") + + def streamed_generation( + self, + prompt: str + ) -> str: + """ + Stream the generation of the response + + Args: + prompt (str): The prompt to use + + Example: + # Feature 4: Streamed generation + response = flow.streamed_generation("Generate a report on finance") + print(response) + + """ + tokens = list(prompt) + response = "" + for token in tokens: + time.sleep(0.1) + response += token + print(token, end="", flush=True) + print() + return response \ No newline at end of file