pull/584/head
Your Name 4 months ago
parent 63a5ad4fd5
commit c254ec1742

@ -531,6 +531,82 @@ print(f"Generated data: {generated_data}")
```
## Integrating External Agents
Integrating external agents from other agent frameworks is easy with swarms.
Steps:
1. Create a new class that inherits `Agent`
2. Create a `.run(task: str) -> str` method that runs the agent and returns the response.
3. The new Agent must return a string of the response. But you may add additional methods to save the output to JSON.
### Griptape Example
For example, here's an example on how to create an agent from griptape.
Heres how you can create a custom **Griptape** agent that integrates with the **Swarms** framework by inheriting from the `Agent` class in **Swarms** and overriding the `run(task: str) -> str` method.
```python
from swarms import (
Agent as SwarmsAgent,
) # Import the base Agent class from Swarms
from griptape.structures import Agent as GriptapeAgent
from griptape.tools import (
WebScraperTool,
FileManagerTool,
PromptSummaryTool,
)
# Create a custom agent class that inherits from SwarmsAgent
class GriptapeSwarmsAgent(SwarmsAgent):
def __init__(self, *args, **kwargs):
# Initialize the Griptape agent with its tools
self.agent = GriptapeAgent(
input="Load {{ args[0] }}, summarize it, and store it in a file called {{ args[1] }}.",
tools=[
WebScraperTool(off_prompt=True),
PromptSummaryTool(off_prompt=True),
FileManagerTool(),
],
*args,
**kwargs,
# Add additional settings
)
# Override the run method to take a task and execute it using the Griptape agent
def run(self, task: str) -> str:
# Extract URL and filename from task (you can modify this parsing based on task structure)
url, filename = task.split(
","
) # Example of splitting task string
# Execute the Griptape agent with the task inputs
result = self.agent.run(url.strip(), filename.strip())
# Return the final result as a string
return str(result)
# Example usage:
griptape_swarms_agent = GriptapeSwarmsAgent()
output = griptape_swarms_agent.run(
"https://griptape.ai, griptape.txt"
)
print(output)
```
### Key Components:
1. **GriptapeSwarmsAgent**: A custom class that inherits from the `SwarmsAgent` class and integrates the Griptape agent.
2. **run(task: str) -> str**: A method that takes a task string, processes it (e.g., splitting into a URL and filename), and runs the Griptape agent with the provided inputs.
3. **Griptape Tools**: The tools integrated into the Griptape agent (e.g., `WebScraperTool`, `PromptSummaryTool`, `FileManagerTool`) allow for web scraping, summarization, and file management.
You can now easily plug this custom Griptape agent into the **Swarms Framework** and use it to run tasks!
# Multi-Agent Orchestration:
Swarms was designed to facilitate the communication between many different and specialized agents from a vast array of other frameworks such as langchain, autogen, crew, and more.
@ -960,13 +1036,6 @@ Get onboarded now with the creator and lead maintainer of Swarms, Kye Gomez, who
## Documentation
Documentation is located here at: [docs.swarms.world](https://docs.swarms.world)
----
## Docker Instructions
- [Learn More Here About Deployments In Docker](https://docs.swarms.world/en/latest/docker_setup/)
-----
## Folder Structure
@ -978,7 +1047,7 @@ The swarms package has been meticlously crafted for extreme use-ability and unde
├── artifacts
├── memory
├── schemas
├── models
├── models -> swarm_models
├── prompts
├── structs
├── telemetry

@ -10,8 +10,7 @@ Welcome to the documentation for the llm section of the swarms package, designed
### Table of Contents
1. [OpenAI](#openai)
2. [HuggingFace](#huggingface)
3. [Google PaLM](#google-palm)
4. [Anthropic](#anthropic)
3. [Anthropic](#anthropic)
### 1. OpenAI (swarm_models.OpenAI)

@ -0,0 +1,45 @@
from swarms import (
Agent as SwarmsAgent,
) # Import the base Agent class from Swarms
from griptape.structures import Agent as GriptapeAgent
from griptape.tools import (
WebScraperTool,
FileManagerTool,
PromptSummaryTool,
)
# Create a custom agent class that inherits from SwarmsAgent
class GriptapeSwarmsAgent(SwarmsAgent):
def __init__(self, *args, **kwargs):
# Initialize the Griptape agent with its tools
self.agent = GriptapeAgent(
input="Load {{ args[0] }}, summarize it, and store it in a file called {{ args[1] }}.",
tools=[
WebScraperTool(off_prompt=True),
PromptSummaryTool(off_prompt=True),
FileManagerTool(),
],
*args,
**kwargs,
# Add additional settings
)
# Override the run method to take a task and execute it using the Griptape agent
def run(self, task: str) -> str:
# Extract URL and filename from task (you can modify this parsing based on task structure)
url, filename = task.split(
","
) # Example of splitting task string
# Execute the Griptape agent with the task inputs
result = self.agent.run(url.strip(), filename.strip())
# Return the final result as a string
return str(result)
# Example usage:
griptape_swarms_agent = GriptapeSwarmsAgent()
output = griptape_swarms_agent.run(
"https://griptape.ai, griptape.txt"
)
print(output)

@ -0,0 +1,135 @@
from typing import Any, Dict, List, Optional
from langchain import hub
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_community.tools.tavily_search import (
TavilySearchResults,
)
from langchain_openai import ChatOpenAI
from loguru import logger
class LangchainAgent:
def __init__(
self,
tavily_api_key: str,
llm_model: str = "gpt-3.5-turbo",
temperature: float = 0.7,
tavily_max_results: int = 1,
prompt_hub_url: str = "hwchase17/openai-tools-agent",
verbose: bool = True,
log_file: Optional[str] = None,
openai_api_key: Optional[str] = None,
) -> None:
"""
Initializes the LangchainAgent with given tools and parameters.
:param tavily_api_key: The API key for the Tavily search tool.
:param llm_model: The OpenAI language model to be used (default: "gpt-3.5-turbo").
:param temperature: Temperature for the language model (default: 0.7).
:param tavily_max_results: Maximum results for the Tavily search (default: 1).
:param prompt_hub_url: URL of the prompt hub to fetch the agent prompt (default: "hwchase17/openai-tools-agent").
:param verbose: If True, the agent will print detailed logs (default: True).
:param log_file: Optional log file to store logs using Loguru.
:param openai_api_key: Optional OpenAI API key for connecting to OpenAI services.
"""
# Setup Loguru for logging
if log_file:
logger.add(log_file, rotation="500 MB")
# Log initialization
logger.info(
"Initializing LangchainAgent with model: {}, temperature: {}",
llm_model,
temperature,
)
# Set up Tavily Search tool
logger.info(
"Setting up Tavily Search with max_results: {}",
tavily_max_results,
)
self.tavily_search = TavilySearchResults(
api_key=tavily_api_key, max_results=tavily_max_results
)
# Tools list (can be expanded)
self.tools = [self.tavily_search]
# Initialize the LLM (OpenAI Chat model)
logger.info("Initializing OpenAI model: {}", llm_model)
self.llm = ChatOpenAI(
model=llm_model,
temperature=temperature,
openai_api_key=openai_api_key,
)
# Fetch the prompt template from LangChain hub
logger.info(
"Fetching prompt template from {}", prompt_hub_url
)
self.prompt = hub.pull(prompt_hub_url)
# Create the OpenAI Tools agent
logger.info(
"Creating OpenAI Tools agent with fetched prompt and LLM."
)
self.agent = create_openai_tools_agent(
self.llm, self.tools, self.prompt
)
# Create AgentExecutor with the agent and tools
logger.info(
"Setting up AgentExecutor with verbose: {}", verbose
)
self.agent_executor = AgentExecutor(
agent=self.agent, tools=self.tools, verbose=verbose
)
def run(
self,
task: str,
chat_history: Optional[List[Dict[str, str]]] = None,
) -> str:
"""
Run the LangchainAgent with a specific task.
:param task: The task (query) for the agent to handle.
:param chat_history: Optional previous chat history for context (default: None).
:return: The result of the task.
"""
logger.info("Running agent with task: {}", task)
# Create input for agent execution
input_data: Dict[str, Any] = {"input": task}
if chat_history:
logger.info("Passing chat history for context.")
input_data["chat_history"] = chat_history
# Invoke the agent
logger.info("Invoking the agent executor.")
result = self.agent_executor.invoke(input_data)
# Log the result
logger.info(
"Task executed successfully. Result: {}", result["output"]
)
# Return the output from the agent
# return result["output"]
return result
# # Example usage:
# agent = LangchainAgent(
# tavily_api_key="your_tavily_api_key",
# llm_model="gpt-3.5-turbo",
# temperature=0.5,
# tavily_max_results=3,
# prompt_hub_url="your-prompt-url",
# verbose=True,
# log_file="agent.log",
# openai_api_key="your_openai_api_key"
# )
# result = agent.run("What is LangChain?")
# print(result)

@ -1,84 +0,0 @@
import os
from swarms.structs.agent import Agent
from swarm_models.popular_llms import OpenAIChat
from swarms.structs.agent_registry import AgentRegistry
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Registry of agents
agent_registry = AgentRegistry(
name="Swarms CLI",
description="A registry of agents for the Swarms CLI",
)
def create_agent(name: str, system_prompt: str, max_loops: int = 1):
"""
Create and initialize an agent with the given parameters.
Args:
name (str): The name of the agent.
system_prompt (str): The system prompt for the agent.
max_loops (int, optional): The maximum number of loops the agent can perform. Defaults to 1.
Returns:
Agent: The initialized agent.
"""
# Initialize the agent
agent = Agent(
agent_name=name,
system_prompt=system_prompt,
llm=model,
max_loops=max_loops,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path=f"{name}.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
# return_step_meta=True,
# disable_print_every_step=True,
# output_type="json",
interactive=True,
)
agent_registry.add(agent)
return agent
# Run the agents in the registry
def run_agent_by_name(name: str, task: str, *args, **kwargs):
"""
Run an agent by its name and perform a specified task.
Parameters:
- name (str): The name of the agent.
- task (str): The task to be performed by the agent.
- *args: Variable length argument list.
- **kwargs: Arbitrary keyword arguments.
Returns:
- output: The output of the agent's task.
"""
agent = agent_registry.get_agent_by_name(name)
output = agent.run(task, *args, **kwargs)
return output
# Test
out = create_agent("Accountant1", "Prepares financial statements")
print(out)

@ -1,8 +1,4 @@
from swarms.structs.agent import Agent
from swarms.structs.agent_process import (
AgentProcess,
AgentProcessQueue,
)
from swarms.structs.auto_swarm import AutoSwarm, AutoSwarmRouter
from swarms.structs.base_structure import BaseStructure
from swarms.structs.base_swarm import BaseSwarm
@ -72,8 +68,6 @@ from swarms.structs.yaml_model import (
__all__ = [
"Agent",
"AgentProcess",
"AgentProcessQueue",
"AutoSwarm",
"AutoSwarmRouter",
"BaseStructure",

@ -1,104 +0,0 @@
from datetime import datetime
from pydantic import BaseModel
from swarms.structs.omni_agent_types import AgentListType
from swarms.utils.loguru_logger import logger
from typing import Callable
class AgentProcess(BaseModel):
agent_id: int
agent_name: str
prompt: str
response: str = None
time: Callable = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
priority: int = 0
status: str = "Waiting"
pid: int = None
def set_pid(self, pid: int):
self.pid = pid
def get_pid(self):
return self.pid
def set_time(self, time: callable):
self.time = time
def get_time(self):
return self.time
class AgentProcessQueue:
"""
A class representing a queue of agent processes.
Attributes:
MAX_PID (int): The maximum process ID.
pid_pool (list): A list representing the availability of process IDs.
agent_process_queue (list): A list representing the queue of agent processes.
Methods:
add(agent_process): Adds an agent process to the queue.
print(): Prints the details of all agent processes in the queue.
Private Methods:
_get_available_pid(): Returns an available process ID from the pool.
"""
def __init__(self, max_pid: int = 1024):
self.MAX_PID = max_pid
self.pid_pool = [False for i in range(self.MAX_PID)]
self.agent_process_queue = (
[]
) # Currently use list to simulate queue
def add(self, agents: AgentListType):
"""
Adds an agent process to the queue.
Args:
agent_process (AgentProcess): The agent process to be added.
Returns:
None
"""
for agent in agents:
agent_process = AgentProcess(
agent_id=agent.id,
agent_name=agent.agent_name,
prompt=agent.short_memory.return_history_as_string(),
)
pid = self._get_available_pid()
if pid is None:
logger.warning("No available PID")
return
agent_process.set_pid(pid)
agent_process.set_status("Waiting")
self.agent_process_queue.append(agent_process)
def print(self):
"""
Prints the details of all agent processes in the queue.
Returns:
None
"""
for agent_process in self.agent_process_queue:
logger.info(
f"| Agent-process ID: {agent_process.get_pid()} |"
f" Status: {agent_process.get_status()} |"
)
def _get_available_pid(self):
"""
Returns an available process ID from the pool.
Returns:
int or None: The available process ID, or None if no ID is available.
"""
for i, used in enumerate(self.pid_pool):
if not used:
return i
return None

@ -39,6 +39,18 @@ class AgentRegistrySchema(BaseModel):
class AgentRegistry:
"""
A class for managing a registry of agents.
Attributes:
name (str): The name of the registry.
description (str): A description of the registry.
return_json (bool): Indicates whether to return data in JSON format.
auto_save (bool): Indicates whether to automatically save changes to the registry.
agents (Dict[str, Agent]): A dictionary of agents in the registry, keyed by agent name.
lock (Lock): A lock for thread-safe operations on the registry.
agent_registry (AgentRegistrySchema): The schema for the agent registry.
"""
def __init__(
self,
@ -50,6 +62,16 @@ class AgentRegistry:
*args,
**kwargs,
):
"""
Initializes the AgentRegistry.
Args:
name (str, optional): The name of the registry. Defaults to "Agent Registry".
description (str, optional): A description of the registry. Defaults to "A registry for managing agents.".
agents (Optional[List[Agent]], optional): A list of agents to initially add to the registry. Defaults to None.
return_json (bool, optional): Indicates whether to return data in JSON format. Defaults to True.
auto_save (bool, optional): Indicates whether to automatically save changes to the registry. Defaults to False.
"""
self.name = name
self.description = description
self.return_json = return_json
@ -282,6 +304,12 @@ class AgentRegistry:
raise e
def agent_to_py_model(self, agent: Agent):
"""
Converts an agent to a Pydantic model.
Args:
agent (Agent): The agent to convert.
"""
agent_name = agent.agent_name
agent_description = (
agent.description

@ -5,7 +5,6 @@ from swarms.utils.data_to_text import (
json_to_text,
txt_to_text,
)
from swarms.utils.exponential_backoff import ExponentialBackoffMixin
from swarms.utils.file_processing import (
load_json,
sanitize_file_path,
@ -28,7 +27,6 @@ __all__ = [
"data_to_text",
"json_to_text",
"txt_to_text",
"ExponentialBackoffMixin",
"load_json",
"sanitize_file_path",
"zip_workspace",

@ -1,101 +0,0 @@
import os
import json
from typing import Dict, List
import requests
from loguru import logger
from swarms.structs.agent import Agent
def add_agent_to_marketplace(
name: str,
agent: str,
language: str,
description: str,
use_cases: List[Dict[str, str]],
requirements: List[Dict[str, str]],
tags: str,
) -> Dict[str, str]:
"""
Add an agent to the marketplace.
Args:
name (str): The name of the agent.
agent (str): The agent code.
language (str): The programming language of the agent.
description (str): The description of the agent.
use_cases (List[Dict[str, str]]): The list of use cases for the agent.
requirements (List[Dict[str, str]]): The list of requirements for the agent.
tags (str): The tags for the agent.
api_key (str): The API key for authentication.
Returns:
Dict[str, str]: The response from the API.
Raises:
requests.exceptions.RequestException: If there is an error making the API request.
"""
logger.info("Adding agent to marketplace...")
url = "https://swarms.world/api/add-agent"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {os.getenv("SWARMS_API_KEY")}",
}
data = {
"name": name,
"agent": agent,
"description": description,
"language": language,
"useCases": use_cases,
"requirements": requirements,
"tags": tags,
}
try:
response = requests.post(
url, headers=headers, data=json.dumps(data)
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Error making API request: {e}")
def add_agent_to_marketplace_sync(
agent: Agent,
use_cases: List[Dict[str, str]],
requirements: List[Dict[str, str]],
tags: str,
):
return add_agent_to_marketplace(
name=agent.agent_name,
description=agent.description,
language="python",
use_cases=use_cases,
requirements=requirements,
tags=tags,
)
# # Example usage
# async def main():
# name = "Example Agent"
# agent = "This is an example agent from an API route."
# description = "Description of the agent."
# language = "python"
# use_cases = [
# {"title": "Use case 1", "description": "Description of use case 1"},
# {"title": "Use case 2", "description": "Description of use case 2"}
# ]
# requirements = [
# {"package": "pip", "installation": "pip install"},
# {"package": "pip3", "installation": "pip3 install"}
# ]
# tags = "example, agent"
# api_key = "YOUR_API_KEY"
# result = await add_agent_to_marketplace(name, agent, language, description, use_cases, requirements, tags, api_key)
# print(result)
# asyncio.run(main())

@ -6,6 +6,16 @@ from typing import List
async def async_create_file(file_path: str, content: str) -> None:
"""
Asynchronously creates a file at the specified path and writes the given content to it.
Args:
file_path (str): The path where the file will be created.
content (str): The content to be written to the file.
Returns:
None
"""
async with aio_open(file_path, "w") as file:
await file.write(content)
@ -13,11 +23,19 @@ async def async_create_file(file_path: str, content: str) -> None:
async def create_multiple_files(
file_paths: List[str], contents: List[str]
) -> None:
"""
Asynchronously creates multiple files at the specified paths and writes the corresponding content to each file.
Args:
file_paths (List[str]): A list of paths where the files will be created.
contents (List[str]): A list of content to be written to each file, corresponding to the file paths.
Returns:
None
"""
tasks = [
async_create_file(
(file_path, content)
for file_path, content in zip(file_paths, contents)
)
async_create_file(file_path, content)
for file_path, content in zip(file_paths, contents)
]
await asyncio.gather(*tasks)
@ -26,10 +44,10 @@ async def create_file_with_directory(
file_path: str, content: str
) -> None:
"""
Creates a file with the specified directory path and content.
Creates a file with the specified directory path and content. If the directory does not exist, it is created.
Args:
file_path (str): The path of the file to be created.
file_path (str): The path of the file to be created, including the directory.
content (str): The content to be written to the file.
Returns:

@ -1,7 +1,13 @@
import concurrent
import concurrent.futures
from typing import List, Tuple, Any, Dict, Union, Callable
def execute_concurrently(callable_functions, max_workers=5):
def execute_concurrently(
callable_functions: List[
Tuple[Callable, Tuple[Any, ...], Dict[str, Any]]
],
max_workers: int = 5,
) -> List[Union[Any, Exception]]:
"""
Executes callable functions concurrently using multithreading.
@ -16,7 +22,12 @@ def execute_concurrently(callable_functions, max_workers=5):
"""
results = [None] * len(callable_functions)
def worker(fn, args, kwargs, index):
def worker(
fn: Callable,
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
index: int,
) -> None:
try:
result = fn(*args, **kwargs)
results[index] = result

@ -1,47 +0,0 @@
import logging
from abc import ABC
from dataclasses import dataclass
from tenacity import Retrying, stop_after_attempt, wait_exponential
@dataclass
class ExponentialBackoffMixin(ABC):
"""
A mixin class that provides exponential backoff functionality.
"""
min_retry_delay: float = 2
"""
The minimum delay between retries in seconds.
"""
max_retry_delay: float = 10
"""
The maximum delay between retries in seconds.
"""
max_attempts: int = 10
"""
The maximum number of retry attempts.
"""
def after_hook(s: str) -> None:
return logging.warning(s)
"""
A callable that is executed after each retry attempt.
"""
def retrying(self) -> Retrying:
"""
Returns a Retrying object configured with the exponential backoff settings.
"""
return Retrying(
wait=wait_exponential(
min=self.min_retry_delay, max=self.max_retry_delay
),
stop=stop_after_attempt(self.max_attempts),
reraise=True,
after=self.after_hook,
)
Loading…
Cancel
Save