[FEAT][SequentialWorkflow][Fix]

pull/393/head
Kye 11 months ago
parent ce0dd1f5a6
commit 490e278385

Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

@ -1,5 +1,7 @@
# The Limits of Individual Agents
![Reliable Agents](docs/assets/img/reliabilitythrough.png)
- Context Window Limits
- Single Task Execution
- Hallucination

@ -0,0 +1,10 @@
from swarms.models.azure_openai_llm import AzureOpenAI
# Initialize Azure OpenAI
model = AzureOpenAI()
# Run the model
model(
"Create a youtube script for a video on how to use the swarms"
" framework"
)

@ -75,6 +75,7 @@ supervision = "*"
scikit-image = "*"
pinecone-client = "*"
roboflow = "*"
langchain-core = "0.1.27"

@ -3,6 +3,7 @@ transformers
pandas==1.5.3
langchain==0.0.333
langchain-experimental==0.0.10
langchain-core==0.1.27
httpx==0.24.1
Pillow==9.4.0
faiss-cpu==1.7.4

@ -0,0 +1,39 @@
from swarms import Agent, OpenAIChat, SequentialWorkflow
# Example usage
llm = OpenAIChat(
temperature=0.5,
max_tokens=3000,
)
# Initialize the Agent with the language agent
agent1 = Agent(
agent_name="John the writer",
llm=llm,
max_loops=1,
dashboard=False,
)
# Create another Agent for a different task
agent2 = Agent("Summarizer", llm=llm, max_loops=1, dashboard=False)
# Create the workflow
workflow = SequentialWorkflow(
name="Blog Generation Workflow",
description=(
"Generate a youtube transcript on how to deploy agents into production"
),
max_loops=1,
autosave=True,
dashboard=False,
agents=[agent1, agent2],
)
# Run the workflow
workflow.run()
# # # Output the results
# for task in workflow.tasks:
# print(f"Task: {task.description}, Result: {task.result}")

@ -0,0 +1,223 @@
from __future__ import annotations
import logging
import os
from typing import Any, Callable, Dict, List, Mapping, Optional, Union
import openai
from langchain_core.pydantic_v1 import (
Field,
SecretStr,
root_validator,
)
from langchain_core.utils import (
convert_to_secret_str,
get_from_dict_or_env,
)
from langchain_openai.llms.base import BaseOpenAI
logger = logging.getLogger(__name__)
class AzureOpenAI(BaseOpenAI):
"""Azure-specific OpenAI large language models.
To use, you should have the ``openai`` python package installed, and the
environment variable ``OPENAI_API_KEY`` set with your API key.
Any parameters that are valid to be passed to the openai.create call can be passed
in, even if not explicitly saved on this class.
Example:
.. code-block:: python
from swarms import AzureOpenAI
openai = AzureOpenAI(model_name="gpt-3.5-turbo-instruct")
"""
azure_endpoint: Union[str, None] = None
"""Your Azure endpoint, including the resource.
Automatically inferred from env var `AZURE_OPENAI_ENDPOINT` if not provided.
Example: `https://example-resource.azure.openai.com/`
"""
deployment_name: Union[str, None] = Field(
default=None, alias="azure_deployment"
)
"""A model deployment.
If given sets the base client URL to include `/deployments/{azure_deployment}`.
Note: this means you won't be able to use non-deployment endpoints.
"""
openai_api_version: str = Field(default="", alias="api_version")
"""Automatically inferred from env var `OPENAI_API_VERSION` if not provided."""
openai_api_key: Optional[SecretStr] = Field(
default=None, alias="api_key"
)
"""Automatically inferred from env var `AZURE_OPENAI_API_KEY` if not provided."""
azure_ad_token: Optional[SecretStr] = None
"""Your Azure Active Directory token.
Automatically inferred from env var `AZURE_OPENAI_AD_TOKEN` if not provided.
For more:
https://www.microsoft.com/en-us/security/business/identity-access/microsoft-entra-id.
""" # noqa: E501
azure_ad_token_provider: Union[Callable[[], str], None] = None
"""A function that returns an Azure Active Directory token.
Will be invoked on every request.
"""
openai_api_type: str = ""
"""Legacy, for openai<1.0.0 support."""
validate_base_url: bool = True
"""For backwards compatibility. If legacy val openai_api_base is passed in, try to
infer if it is a base_url or azure_endpoint and update accordingly.
"""
@classmethod
def get_lc_namespace(cls) -> List[str]:
"""Get the namespace of the langchain object."""
return ["langchain", "llms", "openai"]
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""Validate that api key and python package exists in environment."""
if values["n"] < 1:
raise ValueError("n must be at least 1.")
if values["streaming"] and values["n"] > 1:
raise ValueError("Cannot stream results when n > 1.")
if values["streaming"] and values["best_of"] > 1:
raise ValueError(
"Cannot stream results when best_of > 1."
)
# Check OPENAI_KEY for backwards compatibility.
# TODO: Remove OPENAI_API_KEY support to avoid possible conflict when using
# other forms of azure credentials.
openai_api_key = (
values["openai_api_key"]
or os.getenv("AZURE_OPENAI_API_KEY")
or os.getenv("OPENAI_API_KEY")
)
values["openai_api_key"] = (
convert_to_secret_str(openai_api_key)
if openai_api_key
else None
)
values["azure_endpoint"] = values[
"azure_endpoint"
] or os.getenv("AZURE_OPENAI_ENDPOINT")
azure_ad_token = values["azure_ad_token"] or os.getenv(
"AZURE_OPENAI_AD_TOKEN"
)
values["azure_ad_token"] = (
convert_to_secret_str(azure_ad_token)
if azure_ad_token
else None
)
values["openai_api_base"] = values[
"openai_api_base"
] or os.getenv("OPENAI_API_BASE")
values["openai_proxy"] = get_from_dict_or_env(
values,
"openai_proxy",
"OPENAI_PROXY",
default="",
)
values["openai_organization"] = (
values["openai_organization"]
or os.getenv("OPENAI_ORG_ID")
or os.getenv("OPENAI_ORGANIZATION")
)
values["openai_api_version"] = values[
"openai_api_version"
] or os.getenv("OPENAI_API_VERSION")
values["openai_api_type"] = get_from_dict_or_env(
values,
"openai_api_type",
"OPENAI_API_TYPE",
default="azure",
)
# For backwards compatibility. Before openai v1, no distinction was made
# between azure_endpoint and base_url (openai_api_base).
openai_api_base = values["openai_api_base"]
if openai_api_base and values["validate_base_url"]:
if "/openai" not in openai_api_base:
values["openai_api_base"] = (
values["openai_api_base"].rstrip("/") + "/openai"
)
raise ValueError(
"As of openai>=1.0.0, Azure endpoints should be"
" specified via the `azure_endpoint` param not"
" `openai_api_base` (or alias `base_url`)."
)
if values["deployment_name"]:
raise ValueError(
"As of openai>=1.0.0, if `deployment_name` (or"
" alias `azure_deployment`) is specified then"
" `openai_api_base` (or alias `base_url`) should"
" not be. Instead use `deployment_name` (or alias"
" `azure_deployment`) and `azure_endpoint`."
)
values["deployment_name"] = None
client_params = {
"api_version": values["openai_api_version"],
"azure_endpoint": values["azure_endpoint"],
"azure_deployment": values["deployment_name"],
"api_key": (
values["openai_api_key"].get_secret_value()
if values["openai_api_key"]
else None
),
"azure_ad_token": (
values["azure_ad_token"].get_secret_value()
if values["azure_ad_token"]
else None
),
"azure_ad_token_provider": values[
"azure_ad_token_provider"
],
"organization": values["openai_organization"],
"base_url": values["openai_api_base"],
"timeout": values["request_timeout"],
"max_retries": values["max_retries"],
"default_headers": values["default_headers"],
"default_query": values["default_query"],
"http_client": values["http_client"],
}
values["client"] = openai.AzureOpenAI(
**client_params
).completions
values["async_client"] = openai.AsyncAzureOpenAI(
**client_params
).completions
return values
@property
def _identifying_params(self) -> Mapping[str, Any]:
return {
**{"deployment_name": self.deployment_name},
**super()._identifying_params,
}
@property
def _invocation_params(self) -> Dict[str, Any]:
openai_params = {"model": self.deployment_name}
return {**openai_params, **super()._invocation_params}
@property
def _llm_type(self) -> str:
"""Return type of llm."""
return "azure"
@property
def lc_attributes(self) -> Dict[str, Any]:
return {
"openai_api_type": self.openai_api_type,
"openai_api_version": self.openai_api_version,
}

@ -671,9 +671,9 @@ class Agent:
):
break
if self.parse_done_token:
if parse_done_token(response):
break
# if self.parse_done_token:
# if parse_done_token(response):
# break
if self.stopping_func is not None:
if self.stopping_func(response) is True:

@ -5,7 +5,8 @@ from termcolor import colored
from swarms.structs.base import BaseStructure
from swarms.structs.task import Task
from swarms.structs.agent import Agent
from swarms.utils.loguru_logger import logger
class BaseWorkflow(BaseStructure):
"""
@ -14,18 +15,27 @@ class BaseWorkflow(BaseStructure):
Attributes:
task_pool (list): A list to store tasks.
Methods:
add(task: Task = None, tasks: List[Task] = None, *args, **kwargs):
Adds a task or a list of tasks to the task pool.
run():
Abstract method to run the workflow.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.task_pool = []
self.agent_pool = []
# Logging
logger.info("Number of agents activated:")
if self.agents:
logger.info(f"Agents: {len(self.agents)}")
else:
logger.info("No agents activated.")
def add(
if self.task_pool:
logger.info(f"Task Pool Size: {len(self.task_pool)}")
else:
logger.info("Task Pool is empty.")
def add_task(
self,
task: Task = None,
tasks: List[Task] = None,
@ -50,6 +60,14 @@ class BaseWorkflow(BaseStructure):
raise ValueError(
"You must provide a task or a list of tasks"
)
def add_agent(
self,
agent: Agent,
*args,
**kwargs
):
return self.agent_pool(agent)
def run(self):
"""
@ -318,3 +336,56 @@ class BaseWorkflow(BaseStructure):
"red",
)
)
def workflow_dashboard(self, **kwargs) -> None:
"""
Displays a dashboard for the workflow.
Args:
**kwargs: Additional keyword arguments to pass to the dashboard.
Examples:
>>> from swarms.models import OpenAIChat
>>> from swarms.structs import SequentialWorkflow
>>> llm = OpenAIChat(openai_api_key="")
>>> workflow = SequentialWorkflow(max_loops=1)
>>> workflow.add("What's the weather in miami", llm)
>>> workflow.add("Create a report on these metrics", llm)
>>> workflow.workflow_dashboard()
"""
print(
colored(
f"""
Sequential Workflow Dashboard
--------------------------------
Name: {self.name}
Description: {self.description}
task_pool: {len(self.task_pool)}
Max Loops: {self.max_loops}
Autosave: {self.autosave}
Autosave Filepath: {self.saved_state_filepath}
Restore Filepath: {self.restore_state_filepath}
--------------------------------
Metadata:
kwargs: {kwargs}
""",
"cyan",
attrs=["bold", "underline"],
)
)
def workflow_bootup(self, **kwargs) -> None:
"""
Workflow bootup.
"""
print(
colored(
"""
Sequential Workflow Initializing...""",
"green",
attrs=["bold", "underline"],
)
)

@ -10,6 +10,7 @@ from swarms.structs.task import Task
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.loguru_logger import logger
from swarms.structs.base_workflow import BaseWorkflow
# SequentialWorkflow class definition using dataclasses
@ -61,8 +62,16 @@ class SequentialWorkflow:
)
# Logging
logger.info(f"Number of agents activated: {len(self.agents)}")
logger.info(f"Task Pool Size: {self.task_pool}")
logger.info("Number of agents activated:")
if self.agents:
logger.info(f"Agents: {len(self.agents)}")
else:
logger.info("No agents activated.")
if self.task_pool:
logger.info(f"Task Pool Size: {len(self.task_pool)}")
else:
logger.info("Task Pool is empty.")
def add(
self,
@ -81,7 +90,6 @@ class SequentialWorkflow:
*args: Additional arguments to pass to the task execution.
**kwargs: Additional keyword arguments to pass to the task execution.
"""
logger.info("A")
for agent in self.agents:
out = agent(str(self.description))
self.conversation.add(agent.agent_name, out)
@ -169,217 +177,65 @@ class SequentialWorkflow:
),
)
def save_workflow_state(
self,
filepath: Optional[str] = "sequential_workflow_state.json",
**kwargs,
) -> None:
"""
Saves the workflow state to a json file.
Args:
filepath (str): The path to save the workflow state to.
Examples:
>>> from swarms.models import OpenAIChat
>>> from swarms.structs import SequentialWorkflow
>>> llm = OpenAIChat(openai_api_key="")
>>> workflow = SequentialWorkflow(max_loops=1)
>>> workflow.add("What's the weather in miami", llm)
>>> workflow.add("Create a report on these metrics", llm)
>>> workflow.save_workflow_state("sequential_workflow_state.json")
"""
try:
filepath = filepath or self.saved_state_filepath
with open(filepath, "w") as f:
# Saving the state as a json for simplicuty
state = {
"task_pool": [
{
"description": task.description,
"args": task.args,
"kwargs": task.kwargs,
"result": task.result,
"history": task.history,
}
for task in self.task_pool
],
"max_loops": self.max_loops,
}
json.dump(state, f, indent=4)
logger.info(
"[INFO][SequentialWorkflow] Saved workflow state to"
f" {filepath}"
)
except Exception as error:
logger.error(
colored(
f"Error saving workflow state: {error}",
"red",
)
)
def workflow_bootup(self, **kwargs) -> None:
"""
Workflow bootup.
"""
print(
colored(
"""
Sequential Workflow Initializing...""",
"green",
attrs=["bold", "underline"],
)
)
def workflow_dashboard(self, **kwargs) -> None:
"""
Displays a dashboard for the workflow.
Args:
**kwargs: Additional keyword arguments to pass to the dashboard.
Examples:
>>> from swarms.models import OpenAIChat
>>> from swarms.structs import SequentialWorkflow
>>> llm = OpenAIChat(openai_api_key="")
>>> workflow = SequentialWorkflow(max_loops=1)
>>> workflow.add("What's the weather in miami", llm)
>>> workflow.add("Create a report on these metrics", llm)
>>> workflow.workflow_dashboard()
"""
print(
colored(
f"""
Sequential Workflow Dashboard
--------------------------------
Name: {self.name}
Description: {self.description}
task_pool: {len(self.task_pool)}
Max Loops: {self.max_loops}
Autosave: {self.autosave}
Autosave Filepath: {self.saved_state_filepath}
Restore Filepath: {self.restore_state_filepath}
--------------------------------
Metadata:
kwargs: {kwargs}
""",
"cyan",
attrs=["bold", "underline"],
)
)
def workflow_shutdown(self, **kwargs) -> None:
"""Shuts down the workflow."""
print(
colored(
"""
Sequential Workflow Shutdown...""",
"red",
attrs=["bold", "underline"],
)
)
def load_workflow_state(
self, filepath: str = None, **kwargs
) -> None:
"""
Loads the workflow state from a json file and restores the workflow state.
Args:
filepath (str): The path to load the workflow state from.
Examples:
>>> from swarms.models import OpenAIChat
>>> from swarms.structs import SequentialWorkflow
>>> llm = OpenAIChat(openai_api_key="")
>>> workflow = SequentialWorkflow(max_loops=1)
>>> workflow.add("What's the weather in miami", llm)
>>> workflow.add("Create a report on these metrics", llm)
>>> workflow.save_workflow_state("sequential_workflow_state.json")
>>> workflow.load_workflow_state("sequential_workflow_state.json")
"""
try:
filepath = filepath or self.restore_state_filepath
with open(filepath) as f:
state = json.load(f)
self.max_loops = state["max_loops"]
self.task_pool = []
for task_state in state["task_pool"]:
task = Task(
description=task_state["description"],
agent=task_state["agent"],
args=task_state["args"],
kwargs=task_state["kwargs"],
result=task_state["result"],
history=task_state["history"],
)
self.task_pool.append(task)
print(
"[INFO][SequentialWorkflow] Loaded workflow state"
f" from {filepath}"
)
except Exception as error:
logger.error(
colored(
f"Error loading workflow state: {error}",
"red",
)
)
def run(self) -> None:
"""
Run the workflow.
Raises:
ValueError: If a Agent instance is used as a task and the 'task' argument is not provided.
ValueError: If an Agent instance is used as a task and the 'task' argument is not provided.
"""
try:
self.workflow_bootup()
loops = 0
while loops < self.max_loops:
for i in range(len(self.task_pool)):
task = self.task_pool[i]
# Check if the current task can be executed
if task.result is None:
# Get the inputs for the current task
task.context(task)
result = task.execute()
# Pass the inputs to the next task
if i < len(self.task_pool) - 1:
next_task = self.task_pool[i + 1]
next_task.description = result
# Execute the current task
task.execute()
# Autosave the workflow state
if self.autosave:
self.save_workflow_state(
"sequential_workflow_state.json"
)
self.workflow_shutdown()
loops += 1
except Exception as e:
logger.error(
colored(
(
"Error initializing the Sequential workflow:"
f" {e} try optimizing your inputs like the"
" agent class and task description"
),
"red",
attrs=["bold", "underline"],
)
)
self.workflow_bootup()
loops = 0
while loops < self.max_loops:
for i, agent in enumerate(self.agents):
logger.info(f"Agent {i+1} is executing the task.")
out = agent(self.description)
self.conversation.add(agent.agent_name, str(out))
prompt = self.conversation.return_history_as_string()
print(prompt)
print(f"Next agent...........")
out = agent(prompt)
return out
# try:
# self.workflow_bootup()
# loops = 0
# while loops < self.max_loops:
# for i in range(len(self.task_pool)):
# task = self.task_pool[i]
# # Check if the current task can be executed
# if task.result is None:
# # Get the inputs for the current task
# task.context(task)
# result = task.execute()
# # Pass the inputs to the next task
# if i < len(self.task_pool) - 1:
# next_task = self.task_pool[i + 1]
# next_task.description = result
# # Execute the current task
# task.execute()
# # Autosave the workflow state
# if self.autosave:
# self.save_workflow_state(
# "sequential_workflow_state.json"
# )
# self.workflow_shutdown()
# loops += 1
# except Exception as e:
# logger.error(
# colored(
# (
# "Error initializing the Sequential workflow:"
# f" {e} try optimizing your inputs like the"
# " agent class and task description"
# ),
# "red",
# attrs=["bold", "underline"],
# )
# )

@ -1,6 +1,6 @@
from loguru import logger
logger = logger.add(
logger.add(
"MessagePool.log",
level="INFO",
colorize=True,

Loading…
Cancel
Save