From 3e6a3f7139e8f985aceed91fa3892e37832013f6 Mon Sep 17 00:00:00 2001 From: Kye Date: Thu, 9 Nov 2023 15:52:29 -0500 Subject: [PATCH] dynamic max loops, + gpt4clean up Former-commit-id: 371da7944e020bb711c8065848f347efbbbd3744 --- README.md | 15 +- code_quality.sh | 0 demos/ui_software_demo.py | 5 + playground/models/gpt4_v.py | 15 ++ pyproject.toml | 6 +- requirements.txt | 1 + sequential_workflow_example.py | 2 +- swarms/__init__.py | 2 +- swarms/memory/weaviate.py | 4 + swarms/models/fuyu.py | 1 - swarms/models/gpt4v.py | 325 ++++++++++++-------------- swarms/models/layoutlm_document_qa.py | 5 +- swarms/models/nougat.py | 16 +- swarms/models/openai_models.py | 4 +- swarms/structs/flow.py | 142 ++++++----- tests/models/mpt7b.py | 1 + 16 files changed, 287 insertions(+), 257 deletions(-) mode change 100644 => 100755 code_quality.sh create mode 100644 demos/ui_software_demo.py create mode 100644 playground/models/gpt4_v.py create mode 100644 swarms/memory/weaviate.py diff --git a/README.md b/README.md index 289a4c22..abc6ab69 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,9 @@ Book a [1-on-1 Session with Kye](https://calendly.com/swarm-corp/30min), the Cre We have a small gallery of examples to run here, [for more check out the docs to build your own agent and or swarms!](https://docs.apac.ai) ### `Flow` Example -- The `Flow` is a superior iteratioin of the `LLMChain` from Langchain, our intent with `Flow` is to create the most reliable loop structure that gives the agents their "autonomy" through 3 main methods of interaction, one through user specified loops, then dynamic where the agent parses a token, and or an interactive human input verison, or a mix of all 3. +- Reliable Structure that provides LLMS autonomy +- Extremely Customizeable with stopping conditions, interactivity, dynamical temperature, loop intervals, and so much more +- Enterprise Grade + Production Grade: `Flow` is designed and optimized for automating real-world tasks at scale! ```python @@ -86,9 +88,10 @@ out = flow.run("Generate a 10,000 word blog on health and wellness.") ------ ### `SequentialWorkflow` -- Execute tasks step by step by passing in an LLM and the task description! -- Pass in flows with various LLMs -- Save and restore Workflow states! +- A Sequential swarm of autonomous agents where each agent's outputs are fed into the next agent +- Save and Restore Workflow states! +- Integrate Flow's with various LLMs and Multi-Modality Models + ```python from swarms.models import OpenAIChat from swarms.structs import Flow @@ -130,7 +133,6 @@ for task in workflow.tasks: ``` - --- ## Documentation @@ -140,6 +142,9 @@ for task in workflow.tasks: ## Contribute - We're always looking for contributors to help us improve and expand this project. If you're interested, please check out our [Contributing Guidelines](CONTRIBUTING.md) and our [contributing board](https://github.com/users/kyegomez/projects/1) +## Community +- [Join the Swarms community here on Discord!](https://discord.gg/AJazBmhKnr) + # License MIT diff --git a/code_quality.sh b/code_quality.sh old mode 100644 new mode 100755 diff --git a/demos/ui_software_demo.py b/demos/ui_software_demo.py new file mode 100644 index 00000000..6271d96e --- /dev/null +++ b/demos/ui_software_demo.py @@ -0,0 +1,5 @@ +""" +Autonomous swarm that optimizes UI autonomously + +GPT4Vision ->> GPT4 ->> UI +""" \ No newline at end of file diff --git a/playground/models/gpt4_v.py b/playground/models/gpt4_v.py new file mode 100644 index 00000000..5e5d7c95 --- /dev/null +++ b/playground/models/gpt4_v.py @@ -0,0 +1,15 @@ +from swarms.models.gpt4v import GPT4Vision + +api_key = "" + +gpt4vision = GPT4Vision( + openai_api_key=api_key, +) + +img = "https://upload.wikimedia.org/wikipedia/commons/thumb/0/0d/VFPt_Solenoid_correct2.svg/640px-VFPt_Solenoid_correct2.svg.png" + +task = "What is this image" + +answer = gpt4vision.run(task, img) + +print(answer) diff --git a/pyproject.toml b/pyproject.toml index 4ea6bffb..c44cf9dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "2.1.4" +version = "2.1.6" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] @@ -24,7 +24,7 @@ classifiers = [ [tool.poetry.dependencies] python = "^3.8.1" transformers = "*" -openai = "0.28.1" +openai = "*" langchain = "*" asyncio = "*" nest_asyncio = "*" @@ -45,6 +45,8 @@ httpx = "*" tiktoken = "*" attrs = "*" ggl = "*" +ratelimit = "*" + beautifulsoup4 = "*" huggingface-hub = "*" pydantic = "*" diff --git a/requirements.txt b/requirements.txt index f1a5c689..5cb854b9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -36,6 +36,7 @@ tabulate colored griptape addict +ratelimit albumentations basicsr termcolor diff --git a/sequential_workflow_example.py b/sequential_workflow_example.py index 51a48df2..9dc9c828 100644 --- a/sequential_workflow_example.py +++ b/sequential_workflow_example.py @@ -3,7 +3,7 @@ from swarms.structs import Flow from swarms.structs.sequential_workflow import SequentialWorkflow # Example usage -api_key = "" +api_key = "" # Initialize the language flow llm = OpenAIChat( diff --git a/swarms/__init__.py b/swarms/__init__.py index 5de7829b..71481e16 100644 --- a/swarms/__init__.py +++ b/swarms/__init__.py @@ -9,6 +9,6 @@ os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" from swarms.agents import * from swarms.swarms import * from swarms.structs import * -from swarms.models import * # import * only works when __all__ = [] is defined in __init__.py +from swarms.models import * from swarms.chunkers import * from swarms.workers import * diff --git a/swarms/memory/weaviate.py b/swarms/memory/weaviate.py new file mode 100644 index 00000000..a482f71b --- /dev/null +++ b/swarms/memory/weaviate.py @@ -0,0 +1,4 @@ +""" +Weaviate API Client + +""" diff --git a/swarms/models/fuyu.py b/swarms/models/fuyu.py index d7148d0e..63108835 100644 --- a/swarms/models/fuyu.py +++ b/swarms/models/fuyu.py @@ -29,7 +29,6 @@ class Fuyu: >>> fuyu = Fuyu() >>> fuyu("Hello, my name is", "path/to/image.png") - """ def __init__( diff --git a/swarms/models/gpt4v.py b/swarms/models/gpt4v.py index 87393fab..251744e8 100644 --- a/swarms/models/gpt4v.py +++ b/swarms/models/gpt4v.py @@ -1,30 +1,22 @@ +import asyncio import base64 -import logging -import os -import time +import concurrent.futures +import re from dataclasses import dataclass -from typing import List, Optional, Union +from typing import List, Optional, Tuple +import openai import requests +from cachetools import TTLCache from dotenv import load_dotenv from openai import OpenAI +from ratelimit import limits, sleep_and_retry from termcolor import colored # ENV load_dotenv() -def logging_config(): - """Configures logging""" - logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - ) - logger = logging.getLogger(__name__) - - return logger - - @dataclass class GPT4VisionResponse: """A response structure for GPT-4""" @@ -56,7 +48,7 @@ class GPT4Vision: -------- process_img(self, img_path: str) -> str: Processes the image to be used for the API request - __call__(self, img: Union[str, List[str]], tasks: List[str]) -> GPT4VisionResponse: + run(self, img: Union[str, List[str]], tasks: List[str]) -> GPT4VisionResponse: Makes a call to the GPT-4 Vision API and returns the image url Example: @@ -66,23 +58,24 @@ class GPT4Vision: >>> answer = gpt4vision(img, tasks) >>> print(answer) - """ max_retries: int = 3 model: str = "gpt-4-vision-preview" backoff_factor: float = 2.0 timeout_seconds: int = 10 - api_key: Optional[str] = None + openai_api_key: Optional[str] = None # 'Low' or 'High' for respesctively fast or high quality, but high more token usage quality: str = "low" # Max tokens to use for the API request, the maximum might be 3,000 but we don't know max_tokens: int = 200 - client = OpenAI( - api_key=api_key, - max_retries=max_retries, - ) - logger = logging_config() + client = OpenAI(api_key=openai_api_key,) + dashboard: bool = True + call_limit: int = 1 + period_seconds: int = 60 + + # Cache for storing API Responses + cache = TTLCache(maxsize=100, ttl=600) # Cache for 10 minutes class Config: """Config class for the GPT4Vision model""" @@ -94,204 +87,172 @@ class GPT4Vision: with open(img, "rb") as image_file: return base64.b64encode(image_file.read()).decode("utf-8") - def __call__( - self, - img: Union[str, List[str]], - tasks: List[str], - ) -> GPT4VisionResponse: - """ - Calls the GPT-4 Vision API and returns the image url - - Parameters: - ----------- - img: Union[str, List[str]] - The image to be used for the API request - tasks: List[str] - The tasks to be used for the API request - - Returns: - -------- - answer: GPT4VisionResponse - The response from the API request - - Example: - -------- - >>> gpt4vision = GPT4Vision() - >>> img = "https://cdn.openai.com/dall-e/encoded/feats/feats_01J9J5ZKJZJY9.png" - >>> tasks = ["A painting of a dog"] - >>> answer = gpt4vision(img, tasks) - >>> print(answer) - - - """ - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {self.api_key}", - } - - # Image content - image_content = [{ - "type": "imavge_url", - "image_url": img - } if img.startswith("http") else { - "type": "image", - "data": img - } for img in img] - - messages = [{ - "role": - "user", - "content": - image_content + [{ - "type": "text", - "text": q - } for q in tasks], - }] - - payload = { - "model": "gpt-4-vision-preview", - "messages": messages, - "max_tokens": self.max_tokens, - "detail": self.quality, - } - - for attempt in range(self.max_retries): - try: - response = requests.post( - "https://api.openai.com/v1/chat/completions", - headers=headers, - json=payload, - timeout=self.timeout_seconds, - ) - response.raise_for_status() - answer = response.json( - )["choices"][0]["message"]["content"]["text"] - return GPT4VisionResponse(answer=answer) - except requests.exceptions.HTTPError as error: - self.logger.error( - f"HTTP error: {error.response.status_code}, {error.response.text}" - ) - if error.response.status_code in [429, 500, 503]: - # Exponential backoff = 429(too many requesys) - # And 503 = (Service unavailable) errors - time.sleep(self.backoff_factor**attempt) - else: - break - - except requests.exceptions.RequestException as error: - self.logger.error(f"Request error: {error}") - time.sleep(self.backoff_factor**attempt) - except Exception as error: - self.logger.error( - f"Unexpected Error: {error} try optimizing your api key and try" - " again") - raise error from None - - raise TimeoutError("API Request timed out after multiple retries") - - def run(self, task: str, img: str) -> str: + @sleep_and_retry + @limits(calls=call_limit, + period=period_seconds) # Rate limit of 10 calls per minute + def run(self, task: str, img: str): """ - Runs the GPT-4 Vision API + Run the GPT-4 Vision model - Parameters: - ----------- - task: str - The task to be used for the API request - img: str - The image to be used for the API request + Task: str + The task to run + Img: str + The image to run the task on - Returns: - -------- - out: str - The response from the API request - - Example: - -------- - >>> gpt4vision = GPT4Vision() - >>> task = "A painting of a dog" - >>> img = "https://cdn.openai.com/dall-e/encoded/feats/feats_01J9J5ZKJZJY9.png" - >>> answer = gpt4vision.run(task, img) - >>> print(answer) """ + if self.dashboard: + self.print_dashboard() try: response = self.client.chat.completions.create( - model=self.model, + model="gpt-4-vision-preview", messages=[{ "role": "user", "content": [ { "type": "text", - "text": f"{task}" + "text": task }, { "type": "image_url", - "image_url": f"{img}", + "image_url": { + "url": str(img), + }, }, ], }], max_tokens=self.max_tokens, ) - out = response.choices[0].text + out = print(response.choices[0]) + # out = self.clean_output(out) return out - except Exception as error: - print( - colored( - (f"Error when calling GPT4Vision, Error: {error} Try optimizing" - " your key, and try again"), - "red", - )) - - async def arun(self, task: str, img: str) -> str: + except openai.OpenAIError as e: + # logger.error(f"OpenAI API error: {e}") + return f"OpenAI API error: Could not process the image. {e}" + except Exception as e: + return f"Unexpected error occurred while processing the image. {e}" + + def clean_output(self, output: str): + # Regex pattern to find the Choice object representation in the output + pattern = r"Choice\(.*?\(content=\"(.*?)\".*?\)\)" + match = re.search(pattern, output, re.DOTALL) + + if match: + # Extract the content from the matched pattern + content = match.group(1) + # Replace escaped quotes to get the clean content + content = content.replace(r"\"", '"') + print(content) + else: + print("No content found in the output.") + + async def arun(self, task: str, img: str): """ - Asynchronous run method for GPT-4 Vision + Arun is an async version of run - Parameters: - ----------- - task: str - The task to be used for the API request - img: str - The image to be used for the API request + Task: str + The task to run + Img: str + The image to run the task on - Returns: - -------- - out: str - The response from the API request - - Example: - -------- - >>> gpt4vision = GPT4Vision() - >>> task = "A painting of a dog" - >>> img = "https://cdn.openai.com/dall-e/encoded/feats/feats_01J9J5ZKJZJY9.png" - >>> answer = await gpt4vision.arun(task, img) - >>> print(answer) """ try: response = await self.client.chat.completions.create( - model=self.model, + model="gpt-4-vision-preview", messages=[{ "role": "user", "content": [ { "type": "text", - "text": f"{task}" + "text": task }, { "type": "image_url", - "image_url": f"{img}", + "image_url": { + "url": img, + }, }, ], }], max_tokens=self.max_tokens, ) - out = response.choices[0].text - return out - except Exception as error: - print( - colored( - (f"Error when calling GPT4Vision, Error: {error} Try optimizing" - " your key, and try again"), - "red", - )) + + return print(response.choices[0]) + except openai.OpenAIError as e: + # logger.error(f"OpenAI API error: {e}") + return f"OpenAI API error: Could not process the image. {e}" + except Exception as e: + return f"Unexpected error occurred while processing the image. {e}" + + def run_batch(self, tasks_images: List[Tuple[str, str]]) -> List[str]: + """Process a batch of tasks and images""" + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit(self.run, task, img) + for task, img in tasks_images + ] + results = [future.result() for future in futures] + return results + + async def run_batch_async(self, + tasks_images: List[Tuple[str, str]]) -> List[str]: + """Process a batch of tasks and images asynchronously""" + loop = asyncio.get_event_loop() + futures = [ + loop.run_in_executor(None, self.run, task, img) + for task, img in tasks_images + ] + return await asyncio.gather(*futures) + + async def run_batch_async_with_retries( + self, tasks_images: List[Tuple[str, str]]) -> List[str]: + """Process a batch of tasks and images asynchronously with retries""" + loop = asyncio.get_event_loop() + futures = [ + loop.run_in_executor(None, self.run_with_retries, task, img) + for task, img in tasks_images + ] + return await asyncio.gather(*futures) + + def print_dashboard(self): + dashboard = print( + colored( + f""" + GPT4Vision Dashboard + ------------------- + Max Retries: {self.max_retries} + Model: {self.model} + Backoff Factor: {self.backoff_factor} + Timeout Seconds: {self.timeout_seconds} + Image Quality: {self.quality} + Max Tokens: {self.max_tokens} + + """, + "green", + )) + return dashboard + + def health_check(self): + """Health check for the GPT4Vision model""" + try: + response = requests.get("https://api.openai.com/v1/engines") + return response.status_code == 200 + except requests.RequestException as error: + print(f"Health check failed: {error}") + return False + + def sanitize_input(self, text: str) -> str: + """ + Sanitize input to prevent injection attacks. + + Parameters: + text: str - The input text to be sanitized. + + Returns: + The sanitized text. + """ + # Example of simple sanitization, this should be expanded based on the context and usage + sanitized_text = re.sub(r"[^\w\s]", "", text) + return sanitized_text diff --git a/swarms/models/layoutlm_document_qa.py b/swarms/models/layoutlm_document_qa.py index 26734a25..e2b8d1e4 100644 --- a/swarms/models/layoutlm_document_qa.py +++ b/swarms/models/layoutlm_document_qa.py @@ -4,6 +4,7 @@ visual question answering on real world docs lik invoice, pdfs, etc """ from transformers import pipeline + class LayoutLMDocumentQA: """ LayoutLMDocumentQA for document question answering: @@ -23,9 +24,9 @@ class LayoutLMDocumentQA: def __init__( self, model_name: str = "impira/layoutlm-document-qa", - task: str = "document-question-answering", + task_type: str = "document-question-answering", ): - self.pipeline = pipeline(self.task, model=self.model_name) + self.pipeline = pipeline(self.task_type, model=self.model_name) def __call__(self, task: str, img_path: str): """Call for model""" diff --git a/swarms/models/nougat.py b/swarms/models/nougat.py index 9dee7d1b..4de1d952 100644 --- a/swarms/models/nougat.py +++ b/swarms/models/nougat.py @@ -75,13 +75,17 @@ class Nougat: def clean_nougat_output(raw_output): # Define the pattern to extract the relevant data - daily_balance_pattern = r"\*\*(\d{2}/\d{2}/\d{4})\*\*\n\n\*\*([\d,]+\.\d{2})\*\*" - + daily_balance_pattern = ( + r"\*\*(\d{2}/\d{2}/\d{4})\*\*\n\n\*\*([\d,]+\.\d{2})\*\*") + # Find all matches of the pattern matches = re.findall(daily_balance_pattern, raw_output) - + # Convert the matches to a readable format - cleaned_data = ["Date: {}, Amount: {}".format(date, amount.replace(',', '')) for date, amount in matches] - + cleaned_data = [ + "Date: {}, Amount: {}".format(date, amount.replace(",", "")) + for date, amount in matches + ] + # Join the cleaned data with new lines for readability - return '\n'.join(cleaned_data) + return "\n".join(cleaned_data) diff --git a/swarms/models/openai_models.py b/swarms/models/openai_models.py index e1a327b5..128169a3 100644 --- a/swarms/models/openai_models.py +++ b/swarms/models/openai_models.py @@ -493,7 +493,7 @@ class BaseOpenAI(BaseLLM): openai.proxy = { "http": self.openai_proxy, - "https": self.openai_proxy + "https": self.openai_proxy, } # type: ignore[assignment] # noqa: E501 return {**openai_creds, **self._default_params} @@ -783,7 +783,7 @@ class OpenAIChat(BaseLLM): if openai_proxy: openai.proxy = { "http": openai_proxy, - "https": openai_proxy + "https": openai_proxy, } # type: ignore[assignment] # noqa: E501 except ImportError: raise ImportError("Could not import openai python package. " diff --git a/swarms/structs/flow.py b/swarms/structs/flow.py index a7a19258..a3633a2c 100644 --- a/swarms/structs/flow.py +++ b/swarms/structs/flow.py @@ -8,9 +8,9 @@ TODO: - add async processing for run and batch run - add plan module - concurrent -- +- Add batched inputs """ - +import asyncio import json import logging import time @@ -100,24 +100,26 @@ class Flow: self, llm: Any, # template: str, - max_loops: int = 5, + max_loops = 5, stopping_condition: Optional[Callable[[str], bool]] = None, loop_interval: int = 1, retry_attempts: int = 3, retry_interval: int = 1, + return_history: bool = False, + dynamic_loops: Optional[bool] = False, interactive: bool = False, dashboard: bool = False, - name: str = "Flow agent", + agent_name: str = "Flow agent", system_prompt: str = FLOW_SYSTEM_PROMPT, # tools: List[BaseTool] = None, dynamic_temperature: bool = False, saved_state_path: Optional[str] = "flow_state.json", autosave: bool = False, context_length: int = 8192, + user_name: str = "Human", **kwargs: Any, ): self.llm = llm - # self.template = template self.max_loops = max_loops self.stopping_condition = stopping_condition self.loop_interval = loop_interval @@ -130,9 +132,14 @@ class Flow: self.interactive = interactive self.dashboard = dashboard self.dynamic_temperature = dynamic_temperature + self.dynamic_loops = dynamic_loops + self.user_name = user_name + # The max_loops will be set dynamically if the dynamic_loop + if self.dynamic_loops: + self.max_loops = "auto" # self.tools = tools self.system_prompt = system_prompt - self.name = name + self.agent_name = agent_name self.saved_state_path = saved_state_path self.autosave = autosave self.response_filters = [] @@ -194,7 +201,7 @@ class Flow: def add_task_to_memory(self, task: str): """Add the task to the memory""" - self.memory.append([f"Human: {task}"]) + self.memory.append([f"{self.user_name}: {task}"]) def add_message_to_memory(self, message: str): """Add the message to the memory""" @@ -222,7 +229,7 @@ class Flow: ---------------------------------------- Flow Configuration: - Name: {self.name} + Name: {self.agent_name} System Prompt: {self.system_prompt} Task: {task} Max Loops: {self.max_loops} @@ -277,47 +284,40 @@ class Flow: 5. Repeat until stopping condition is met or max_loops is reached """ - # Restore from saved state if provided, ortherwise start with a new history - # if self.saved_state: - # self.load_state(self.saved_state) - # history = self.memory[-1] - # print(f"Loaded state from {self.saved_state}") - # else: - # history = [f"Human: {task}"] - # self.memory.append(history) - - # print(colored(">>> Autonomous Agent Activated", "cyan", attrs=["bold"])) + # Activate Autonomous agent message self.activate_autonomous_agent() - # if self.autosave: - response = task - history = [f"Human: {task}"] + history = [f"{self.user_name}: {task}"] # If dashboard = True then print the dashboard if self.dashboard: self.print_dashboard(task) - for i in range(self.max_loops): - print(colored(f"\nLoop {i+1} of {self.max_loops}", "blue")) + loop_count = 0 + # for i in range(self.max_loops): + while self.max_loops == 'auto' or loop_count < self.max_loops: + loop_count += 1 + print(colored(f"\nLoop {loop_count} of {self.max_loops}", "blue")) print("\n") - if self._check_stopping_condition(response) or parse_done_token( - response): + + if self._check_stopping_condition(response) or parse_done_token(response): break # Adjust temperature, comment if no work if self.dynamic_temperature: self.dynamic_temperature() + # Preparing the prompt + task = self.agent_history_prompt(FLOW_SYSTEM_PROMPT, response) + attempt = 0 while attempt < self.retry_attempts: try: response = self.llm( - self.agent_history_prompt(FLOW_SYSTEM_PROMPT, response), + task **kwargs, ) - # print(f"Next query: {response}") - # break if self.interactive: print(f"AI: {response}") history.append(f"AI: {response}") @@ -341,13 +341,14 @@ class Flow: print(colored(f"Autosaving flow state to {save_path}", "green")) self.save_state(save_path) - return response # , history + if self.return_history: + return response, history + + return response async def arun(self, task: str, **kwargs): - """Async run""" - pass """ - Run the autonomous agent loop + Run the autonomous agent loop aschnronously Args: task (str): The initial task to run @@ -360,44 +361,40 @@ class Flow: 5. Repeat until stopping condition is met or max_loops is reached """ - # Restore from saved state if provided, ortherwise start with a new history - # if self.saved_state: - # self.load_state(self.saved_state) - # history = self.memory[-1] - # print(f"Loaded state from {self.saved_state}") - # else: - # history = [f"Human: {task}"] - # self.memory.append(history) - - print(colored(">>> Autonomous Agent Activated", "cyan", attrs=["bold"])) + # Activate Autonomous agent message + self.activate_autonomous_agent() response = task - history = [f"Human: {task}"] + history = [f"{self.user_name}: {task}"] # If dashboard = True then print the dashboard if self.dashboard: self.print_dashboard(task) - for i in range(self.max_loops): - print(colored(f"\nLoop {i+1} of {self.max_loops}", "blue")) + loop_count = 0 + # for i in range(self.max_loops): + while self.max_loops == 'auto' or loop_count < self.max_loops: + loop_count += 1 + print(colored(f"\nLoop {loop_count} of {self.max_loops}", "blue")) print("\n") - if self._check_stopping_condition(response) or parse_done_token( - response): + + if self._check_stopping_condition(response) or parse_done_token(response): break # Adjust temperature, comment if no work if self.dynamic_temperature: self.dynamic_temperature() + # Preparing the prompt + task = self.agent_history_prompt(FLOW_SYSTEM_PROMPT, response) + attempt = 0 while attempt < self.retry_attempts: try: response = self.llm( - self.agent_history_prompt(FLOW_SYSTEM_PROMPT, response), + task **kwargs, ) - # print(f"Next query: {response}") - # break if self.interactive: print(f"AI: {response}") history.append(f"AI: {response}") @@ -416,10 +413,15 @@ class Flow: time.sleep(self.loop_interval) self.memory.append(history) - # if self.autosave: - # self.save_state("flow_state.json") + if self.autosave: + save_path = self.saved_state_path or "flow_state.json" + print(colored(f"Autosaving flow state to {save_path}", "green")) + self.save_state(save_path) - return response # , history + if self.return_history: + return response, history + + return response def _run(self, **kwargs: Any) -> str: """Generate a result using the provided keyword args.""" @@ -451,6 +453,19 @@ class Flow: """ return agent_history_prompt + async def run_concurrent(self, tasks: List[str], **kwargs): + """ + Run a batch of tasks concurrently and handle an infinite level of task inputs. + + Args: + tasks (List[str]): A list of tasks to run. + """ + task_coroutines = [ + self.run_async(task, **kwargs) for task in tasks + ] + completed_tasks = await asyncio.gather(*task_coroutines) + return completed_tasks + def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]: """Generate responses for multiple input sets.""" return [self.run(**input_data) for input_data in inputs] @@ -666,7 +681,8 @@ class Flow: def get_llm_params(self): """ Extracts and returns the parameters of the llm object for serialization. - It assumes that the llm object has an __init__ method with parameters that can be used to recreate it. + It assumes that the llm object has an __init__ method + with parameters that can be used to recreate it. """ if not hasattr(self.llm, "__init__"): return None @@ -770,8 +786,24 @@ class Flow: Your response: """ response = self.llm(prompt, **kwargs) - return {"role": self.name, "content": response} + return {"role": self.agent_name, "content": response} def update_system_prompt(self, system_prompt: str): """Upddate the system message""" self.system_prompt = system_prompt + + def update_max_loops(self, max_loops: int): + """Update the max loops""" + self.max_loops = max_loops + + def update_loop_interval(self, loop_interval: int): + """Update the loop interval""" + self.loop_interval = loop_interval + + def update_retry_attempts(self, retry_attempts: int): + """Update the retry attempts""" + self.retry_attempts = retry_attempts + + def update_retry_interval(self, retry_interval: int): + """Update the retry interval""" + self.retry_interval = retry_interval diff --git a/tests/models/mpt7b.py b/tests/models/mpt7b.py index cdbd57f6..dfde578d 100644 --- a/tests/models/mpt7b.py +++ b/tests/models/mpt7b.py @@ -1,5 +1,6 @@ import pytest from transformers import AutoModelForCausalLM, AutoTokenizer + from swarms.models.mpt import MPT7B