clean up api route docs

pull/795/head
Kye Gomez 1 month ago
parent 1899c807eb
commit 9b07f448ae

@ -0,0 +1,64 @@
from dotenv import load_dotenv
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.utils.str_to_dict import str_to_dict
load_dotenv()
tools = [
{
"type": "function",
"function": {
"name": "get_stock_price",
"description": "Retrieve the current stock price and related information for a specified company.",
"parameters": {
"type": "object",
"properties": {
"ticker": {
"type": "string",
"description": "The stock ticker symbol of the company, e.g. AAPL for Apple Inc.",
},
"include_history": {
"type": "boolean",
"description": "Indicates whether to include historical price data along with the current price.",
},
"time": {
"type": "string",
"format": "date-time",
"description": "Optional parameter to specify the time for which the stock data is requested, in ISO 8601 format.",
},
},
"required": [
"ticker",
"include_history",
"time",
],
},
},
}
]
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
tools_list_dictionary=tools,
)
out = agent.run(
"What is the current stock price for Apple Inc. (AAPL)? Include historical price data.",
)
print(out)
print(type(out))
print(str_to_dict(out))
print(type(str_to_dict(out)))

@ -21,7 +21,7 @@ load_dotenv()
# Retrieve API key securely from .env
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
BASE_URL = "https://api.swarms.world"
# Headers for secure API communication
headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}

@ -19,7 +19,7 @@ load_dotenv()
# Retrieve API key securely from .env
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
BASE_URL = "https://api.swarms.world"
# Headers for secure API communication
headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}

@ -20,7 +20,7 @@ load_dotenv()
# Retrieve API key securely from .env
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
BASE_URL = "https://api.swarms.world"
# Headers for secure API communication
headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}

@ -548,6 +548,82 @@ agent.run(
print(agent.system_prompt)
```
## Agent Structured Outputs
- Create a structured output schema for the agent [List[Dict]]
- Input in the `tools_list_dictionary` parameter
- Output is a dictionary
- Use the `str_to_dict` function to convert the output to a dictionary
```python
from dotenv import load_dotenv
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.utils.str_to_dict import str_to_dict
load_dotenv()
tools = [
{
"type": "function",
"function": {
"name": "get_stock_price",
"description": "Retrieve the current stock price and related information for a specified company.",
"parameters": {
"type": "object",
"properties": {
"ticker": {
"type": "string",
"description": "The stock ticker symbol of the company, e.g. AAPL for Apple Inc.",
},
"include_history": {
"type": "boolean",
"description": "Indicates whether to include historical price data along with the current price.",
},
"time": {
"type": "string",
"format": "date-time",
"description": "Optional parameter to specify the time for which the stock data is requested, in ISO 8601 format.",
},
},
"required": [
"ticker",
"include_history",
"time",
],
},
},
}
]
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
tools_list_dictionary=tools,
)
out = agent.run(
"What is the current stock price for Apple Inc. (AAPL)? Include historical price data.",
)
print(out)
print(type(out))
print(str_to_dict(out))
print(type(str_to_dict(out)))
```
## Best Practices

@ -2,7 +2,7 @@
*Enterprise-grade Agent Swarm Management API*
**Base URL**: `https://swarms-api-285321057562.us-east1.run.app`
**Base URL**: `https://api.swarms.world`
**API Key Management**: [https://swarms.world/platform/api-keys](https://swarms.world/platform/api-keys)
**Documentation Version**: 1.0.0
**Last Updated**: March 4, 2025
@ -109,7 +109,7 @@ Check if the API service is available and functioning correctly.
**Example Request**:
```bash
curl -X GET "https://swarms-api-285321057562.us-east1.run.app/health" \
curl -X GET "https://api.swarms.world/health" \
-H "x-api-key: your_api_key_here"
```
@ -342,7 +342,7 @@ Schedule a swarm to run at a specific time.
**Example Request**:
```bash
curl -X POST "https://swarms-api-285321057562.us-east1.run.app/v1/swarm/schedule" \
curl -X POST "https://api.swarms.world/v1/swarm/schedule" \
-H "x-api-key: your_api_key_here" \
-H "Content-Type: application/json" \
-d '{
@ -377,7 +377,7 @@ Retrieve all scheduled swarm jobs.
**Example Request**:
```bash
curl -X GET "https://swarms-api-285321057562.us-east1.run.app/v1/swarm/schedule" \
curl -X GET "https://api.swarms.world/v1/swarm/schedule" \
-H "x-api-key: your_api_key_here"
```
@ -418,7 +418,7 @@ Cancel a previously scheduled swarm job.
**Example Request**:
```bash
curl -X DELETE "https://swarms-api-285321057562.us-east1.run.app/v1/swarm/schedule/swarm_daily-market-analysis_1709563245" \
curl -X DELETE "https://api.swarms.world/v1/swarm/schedule/swarm_daily-market-analysis_1709563245" \
-H "x-api-key: your_api_key_here"
```
@ -441,7 +441,7 @@ Retrieve logs of API requests made with your API key.
**Example Request**:
```bash
curl -X GET "https://swarms-api-285321057562.us-east1.run.app/v1/swarm/logs" \
curl -X GET "https://api.swarms.world/v1/swarm/logs" \
-H "x-api-key: your_api_key_here"
```
@ -527,7 +527,7 @@ import json
from datetime import datetime, timedelta
# API Configuration
API_BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
API_BASE_URL = "https://api.swarms.world"
API_KEY = "your_api_key_here"
HEADERS = {
"x-api-key": API_KEY,
@ -695,7 +695,7 @@ import os
from datetime import datetime
# API Configuration
API_BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
API_BASE_URL = "https://api.swarms.world"
API_KEY = os.environ.get("SWARMS_API_KEY")
HEADERS = {
"x-api-key": API_KEY,
@ -913,7 +913,7 @@ import * as dotenv from 'dotenv';
dotenv.config();
// API Configuration
const API_BASE_URL = "https://swarms-api-285321057562.us-east1.run.app";
const API_BASE_URL = "https://api.swarms.world";
const API_KEY = process.env.SWARMS_API_KEY;
// Define interfaces for type safety
@ -1212,7 +1212,7 @@ import * as dotenv from 'dotenv';
dotenv.config();
// API Configuration
const API_BASE_URL = "https://swarms-api-285321057562.us-east1.run.app";
const API_BASE_URL = "https://api.swarms.world";
const API_KEY = process.env.SWARMS_API_KEY;
// Define interfaces

@ -9,7 +9,7 @@ from swarms_tools import coin_gecko_coin_api
load_dotenv()
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
BASE_URL = "https://api.swarms.world"
headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}

@ -7,7 +7,7 @@ load_dotenv()
# Retrieve API key securely from .env
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
BASE_URL = "https://api.swarms.world"
# Headers for secure API communication
headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}

@ -7,7 +7,7 @@ load_dotenv()
# Retrieve API key securely from .env
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
BASE_URL = "https://api.swarms.world"
# Headers for secure API communication
headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "7.5.5"
version = "7.6.0"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -1,3 +1,15 @@
from swarms.agents.consistency_agent import SelfConsistencyAgent
# from swarms.agents.tool_agent import ToolAgent
from swarms.agents.create_agents_from_yaml import (
create_agents_from_yaml,
)
from swarms.agents.i_agent import IterativeReflectiveExpansion
from swarms.agents.reasoning_agents import (
ReasoningAgentRouter,
agent_types,
)
from swarms.agents.reasoning_duo import ReasoningDuo
from swarms.structs.stopping_conditions import (
check_cancelled,
check_complete,
@ -11,15 +23,6 @@ from swarms.structs.stopping_conditions import (
check_success,
)
# from swarms.agents.tool_agent import ToolAgent
from swarms.agents.create_agents_from_yaml import (
create_agents_from_yaml,
)
from swarms.agents.i_agent import IterativeReflectiveExpansion
from swarms.agents.consistency_agent import SelfConsistencyAgent
from swarms.agents.reasoning_duo import ReasoningDuo
__all__ = [
# "ToolAgent",
"check_done",
@ -36,4 +39,6 @@ __all__ = [
"IterativeReflectiveExpansion",
"SelfConsistencyAgent",
"ReasoningDuo",
"ReasoningAgentRouter",
"agent_types",
]

@ -84,6 +84,7 @@ from swarms.structs.swarms_api import (
)
from swarms.structs.de_hallucination_swarm import DeHallucinationSwarm
from swarms.structs.deep_research_swarm import DeepResearchSwarm
__all__ = [
"Agent",
@ -159,4 +160,5 @@ __all__ = [
"AgentsBuilder",
"MALT",
"DeHallucinationSwarm",
"DeepResearchSwarm",
]

@ -83,7 +83,15 @@ def exists(val):
# Agent output types
# agent_output_type = Union[BaseModel, dict, str]
agent_output_type = Literal[
"string", "str", "list", "json", "dict", "yaml", "json_schema"
"string",
"str",
"list",
"json",
"dict",
"yaml",
"json_schema",
"memory-list",
"memory-dict",
]
ToolUsageType = Union[BaseModel, Dict[str, Any]]
@ -461,10 +469,11 @@ class Agent:
self.role = role
self.no_print = no_print
self.tools_list_dictionary = tools_list_dictionary
# Initialize the short term memory
self.short_memory = Conversation(
system_prompt=system_prompt,
time_enabled=True,
time_enabled=False,
user=user_name,
rules=rules,
*args,
@ -586,13 +595,36 @@ class Agent:
from swarms.utils.litellm_wrapper import LiteLLM
if self.model_name is None:
raise ValueError("Model name cannot be None")
# raise ValueError("Model name cannot be None")
logger.warning(
"Model name is not provided, using gpt-4o-mini. You can configure any model from litellm if desired."
)
self.model_name = "gpt-4o-mini"
try:
if self.llm_args is not None:
llm = LiteLLM(
model_name=self.model_name, **self.llm_args
)
elif self.tools_list_dictionary is not None:
length_of_tools_list_dictionary = len(
self.tools_list_dictionary
)
if length_of_tools_list_dictionary > 0:
parallel_tool_calls = True
llm = LiteLLM(
model_name=self.model_name,
temperature=self.temperature,
max_tokens=self.max_tokens,
system_prompt=self.system_prompt,
tools_list_dictionary=self.tools_list_dictionary,
tool_choice="auto",
parallel_tool_calls=parallel_tool_calls,
)
else:
llm = LiteLLM(
model_name=self.model_name,
@ -1107,6 +1139,14 @@ class Agent:
return yaml.safe_dump(
self.agent_output.model_dump(), sort_keys=False
)
elif self.output_type == "memory-list":
return self.short_memory.return_messages_as_list()
elif self.output_type == "memory-dict":
return (
self.short_memory.return_messages_as_dictionary()
)
elif self.return_history is True:
history = self.short_memory.get_str()
@ -1639,7 +1679,7 @@ class Agent:
):
self.short_memory = Conversation(
system_prompt=self.system_prompt,
time_enabled=True,
time_enabled=False,
user=self.user_name,
rules=self.rules,
)

@ -1,13 +1,11 @@
import os
from typing import List, Optional, Tuple
from typing import Any, List, Optional, Tuple
from loguru import logger
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.utils.any_to_str import any_to_str
from swarms.utils.function_caller_model import OpenAIFunctionCaller
from swarms.utils.litellm_tokenizer import count_tokens
BOSS_SYSTEM_PROMPT = """
# Swarm Intelligence Orchestrator
@ -148,13 +146,18 @@ class AgentsBuilder:
description: str = "This is a swarm that creates swarms",
verbose: bool = True,
max_loops: int = 1,
model_name: str = "gpt-4o",
return_dictionary: bool = True,
system_prompt: str = BOSS_SYSTEM_PROMPT,
):
self.name = name
self.description = description
self.verbose = verbose
self.max_loops = max_loops
self.agents_pool = []
self.model_name = model_name
self.return_dictionary = return_dictionary
self.system_prompt = system_prompt
logger.info(
f"Initialized AutoSwarmBuilder: {name} {description}"
)
@ -174,11 +177,9 @@ class AgentsBuilder:
The output from the swarm's execution
"""
logger.info(f"Running swarm on task: {task}")
agents, tokens = self._create_agents(
task, image_url, *args, **kwargs
)
agents = self._create_agents(task, image_url, *args, **kwargs)
return agents, tokens
return agents
def _create_agents(self, task: str, *args, **kwargs):
"""Create the necessary agents for a task.
@ -193,27 +194,32 @@ class AgentsBuilder:
"""
logger.info("Creating agents for task")
model = OpenAIFunctionCaller(
system_prompt=BOSS_SYSTEM_PROMPT,
system_prompt=self.system_prompt,
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.1,
base_model=Agents,
model_name="gpt-4o",
model_name=self.model_name,
max_tokens=8192,
)
agents_dictionary = model.run(task)
print(agents_dictionary)
print(type(agents_dictionary))
logger.info("Agents successfully created")
logger.info(f"Agents: {len(agents_dictionary.agents)}")
total_tokens = any_to_str(agents_dictionary)
if self.return_dictionary:
logger.info("Returning dictionary")
tokens = count_tokens(total_tokens)
# logger.info(f"Tokens: {tokens}")
# # Convert dictionary to SwarmConfig if needed
# if isinstance(agents_dictionary, dict):
# agents_dictionary = Agents(**agents_dictionary)
# Convert swarm config to dictionary
agents_dictionary = agents_dictionary.model_dump()
return agents_dictionary
else:
logger.info("Returning agents")
return self.create_agents(agents_dictionary)
def create_agents(self, agents_dictionary: Any):
# Create agents from config
agents = []
for agent_config in agents_dictionary.agents:
@ -235,7 +241,7 @@ class AgentsBuilder:
)
agents.append(agent)
return agents, tokens
return agents
def build_agent(
self,
@ -278,3 +284,10 @@ class AgentsBuilder:
)
return agent
# if __name__ == "__main__":
# builder = AgentsBuilder(model_name="gpt-4o")
# agents = builder.run("Create a swarm that can write a book about the history of the world")
# print(agents)
# print(type(agents))

@ -137,7 +137,7 @@ class BaseSwarm(ABC):
# Initialize conversation
self.conversation = Conversation(
time_enabled=True, rules=self.rules, *args, **kwargs
time_enabled=False, rules=self.rules, *args, **kwargs
)
# Handle callbacks

@ -8,6 +8,7 @@ from typing import TYPE_CHECKING
from swarms.utils.any_to_str import any_to_str
from swarms.utils.formatter import formatter
from swarms.utils.litellm_tokenizer import count_tokens
import threading
if TYPE_CHECKING:
from swarms.structs.agent import (
@ -17,53 +18,25 @@ if TYPE_CHECKING:
class Conversation(BaseStructure):
"""
A class structure to represent a conversation in a chatbot. This class is used to store the conversation history.
And, it can be used to save the conversation history to a file, load the conversation history from a file, and
display the conversation history. We can also use this class to add the conversation history to a database, query
the conversation history from a database, delete the conversation history from a database, update the conversation
history from a database, and get the conversation history from a database.
Args:
time_enabled (bool): Whether to enable timestamps for the conversation history. Default is False.
database (AbstractDatabase): The database to use for storing the conversation history. Default is None.
autosave (bool): Whether to autosave the conversation history to a file. Default is None.
save_filepath (str): The filepath to save the conversation history to. Default is None.
Methods:
add(role: str, content: str): Add a message to the conversation history.
delete(index: str): Delete a message from the conversation history.
update(index: str, role, content): Update a message in the conversation history.
query(index: str): Query a message in the conversation history.
search(keyword: str): Search for a message in the conversation history.
display_conversation(detailed: bool = False): Display the conversation history.
export_conversation(filename: str): Export the conversation history to a file.
import_conversation(filename: str): Import a conversation history from a file.
count_messages_by_role(): Count the number of messages by role.
return_history_as_string(): Return the conversation history as a string.
save_as_json(filename: str): Save the conversation history as a JSON file.
load_from_json(filename: str): Load the conversation history from a JSON file.
search_keyword_in_conversation(keyword: str): Search for a keyword in the conversation history.
pretty_print_conversation(messages): Pretty print the conversation history.
add_to_database(): Add the conversation history to the database.
query_from_database(query): Query the conversation history from the database.
delete_from_database(): Delete the conversation history from the database.
update_from_database(): Update the conversation history from the database.
get_from_database(): Get the conversation history from the database.
execute_query_from_database(query): Execute a query on the database.
fetch_all_from_database(): Fetch all from the database.
fetch_one_from_database(): Fetch one from the database.
Examples:
>>> from swarms import Conversation
>>> conversation = Conversation()
>>> conversation.add("user", "Hello, how are you?")
>>> conversation.add("assistant", "I am doing well, thanks.")
>>> conversation.display_conversation()
user: Hello, how are you?
assistant: I am doing well, thanks.
A class to manage a conversation history, allowing for the addition, deletion,
and retrieval of messages, as well as saving and loading the conversation
history in various formats.
Attributes:
system_prompt (Optional[str]): The system prompt for the conversation.
time_enabled (bool): Flag to enable time tracking for messages.
autosave (bool): Flag to enable automatic saving of conversation history.
save_filepath (str): File path for saving the conversation history.
tokenizer (Any): Tokenizer for counting tokens in messages.
context_length (int): Maximum number of tokens allowed in the conversation history.
rules (str): Rules for the conversation.
custom_rules_prompt (str): Custom prompt for rules.
user (str): The user identifier for messages.
auto_save (bool): Flag to enable auto-saving of conversation history.
save_as_yaml (bool): Flag to save conversation history as YAML.
save_as_json_bool (bool): Flag to save conversation history as JSON.
token_count (bool): Flag to enable token counting for messages.
conversation_history (list): List to store the history of messages.
"""
def __init__(
@ -84,6 +57,24 @@ class Conversation(BaseStructure):
*args,
**kwargs,
):
"""
Initializes the Conversation object with the provided parameters.
Args:
system_prompt (Optional[str]): The system prompt for the conversation.
time_enabled (bool): Flag to enable time tracking for messages.
autosave (bool): Flag to enable automatic saving of conversation history.
save_filepath (str): File path for saving the conversation history.
tokenizer (Any): Tokenizer for counting tokens in messages.
context_length (int): Maximum number of tokens allowed in the conversation history.
rules (str): Rules for the conversation.
custom_rules_prompt (str): Custom prompt for rules.
user (str): The user identifier for messages.
auto_save (bool): Flag to enable auto-saving of conversation history.
save_as_yaml (bool): Flag to save conversation history as YAML.
save_as_json_bool (bool): Flag to save conversation history as JSON.
token_count (bool): Flag to enable token counting for messages.
"""
super().__init__()
self.system_prompt = system_prompt
self.time_enabled = time_enabled
@ -121,53 +112,68 @@ class Conversation(BaseStructure):
*args,
**kwargs,
):
"""Add a message to the conversation history
"""Add a message to the conversation history.
Args:
role (str): The role of the speaker
content (str): The content of the message
role (str): The role of the speaker (e.g., 'User', 'System').
content (Union[str, dict, list]): The content of the message to be added.
"""
now = datetime.datetime.now()
timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
tokens = count_tokens(any_to_str(content))
# Base message with role
message = {
"role": role,
}
# Handle different content types
if isinstance(content, dict) or isinstance(content, list):
message = {
"role": role,
"content": content,
"token_count": int(tokens),
}
message["content"] = content
else:
message = {
"role": role,
"content": f"Time: {timestamp} \n {content}",
"token_count": int(tokens),
}
message["content"] = f"Time: {timestamp} \n {content}"
# Add the message to history immediately without waiting for token count
self.conversation_history.append(message)
if self.autosave:
# If token counting is enabled, do it in a separate thread
if self.token_count is True:
# Define a function to count tokens and update the message
def count_tokens_thread():
tokens = count_tokens(any_to_str(content))
# Update the message that's already in the conversation history
message["token_count"] = int(tokens)
# If autosave is enabled, save after token count is updated
if self.autosave:
self.save_as_json(self.save_filepath)
# Start a new thread for token counting
token_thread = threading.Thread(
target=count_tokens_thread
)
token_thread.daemon = (
True # Make thread terminate when main program exits
)
token_thread.start()
elif self.autosave:
# If token counting is disabled but autosave is enabled, save immediately
self.save_as_json(self.save_filepath)
def delete(self, index: str):
"""Delete a message from the conversation history
"""Delete a message from the conversation history.
Args:
index (str): index of the message to delete
index (str): Index of the message to delete.
"""
self.conversation_history.pop(index)
def update(self, index: str, role, content):
"""Update a message in the conversation history
"""Update a message in the conversation history.
Args:
index (str): index of the message to update
role (_type_): role of the speaker
content (_type_): content of the message
index (str): Index of the message to update.
role (str): Role of the speaker.
content (Union[str, dict]): New content of the message.
"""
self.conversation_history[index] = {
"role": role,
@ -175,24 +181,24 @@ class Conversation(BaseStructure):
}
def query(self, index: str):
"""Query a message in the conversation history
"""Query a message in the conversation history.
Args:
index (str): index of the message to query
index (str): Index of the message to query.
Returns:
str: the message
dict: The message with its role and content.
"""
return self.conversation_history[index]
def search(self, keyword: str):
"""Search for a message in the conversation history
"""Search for a message in the conversation history.
Args:
keyword (str): Keyword to search for
keyword (str): Keyword to search for.
Returns:
str: description
list: List of messages containing the keyword.
"""
return [
msg
@ -201,10 +207,10 @@ class Conversation(BaseStructure):
]
def display_conversation(self, detailed: bool = False):
"""Display the conversation history
"""Display the conversation history.
Args:
detailed (bool, optional): detailed. Defaults to False.
detailed (bool, optional): Flag to display detailed information. Defaults to False.
"""
for message in self.conversation_history:
formatter.print_panel(
@ -212,20 +218,20 @@ class Conversation(BaseStructure):
)
def export_conversation(self, filename: str, *args, **kwargs):
"""Export the conversation history to a file
"""Export the conversation history to a file.
Args:
filename (str): filename to export to
filename (str): Filename to export to.
"""
with open(filename, "w") as f:
for message in self.conversation_history:
f.write(f"{message['role']}: {message['content']}\n")
def import_conversation(self, filename: str):
"""Import a conversation history from a file
"""Import a conversation history from a file.
Args:
filename (str): filename to import from
filename (str): Filename to import from.
"""
with open(filename) as f:
for line in f:
@ -233,7 +239,11 @@ class Conversation(BaseStructure):
self.add(role, content.strip())
def count_messages_by_role(self):
"""Count the number of messages by role"""
"""Count the number of messages by role.
Returns:
dict: A dictionary with counts of messages by role.
"""
counts = {
"system": 0,
"user": 0,
@ -245,10 +255,10 @@ class Conversation(BaseStructure):
return counts
def return_history_as_string(self):
"""Return the conversation history as a string
"""Return the conversation history as a string.
Returns:
str: the conversation history
str: The conversation history formatted as a string.
"""
return "\n".join(
[
@ -258,39 +268,41 @@ class Conversation(BaseStructure):
)
def get_str(self):
"""Get the conversation history as a string.
Returns:
str: The conversation history.
"""
return self.return_history_as_string()
def save_as_json(self, filename: str = None):
"""Save the conversation history as a JSON file
"""Save the conversation history as a JSON file.
Args:
filename (str): Save the conversation history as a JSON file
filename (str): Filename to save the conversation history.
"""
# Create the directory if it does not exist
# os.makedirs(os.path.dirname(filename), exist_ok=True)
if filename is not None:
with open(filename, "w") as f:
json.dump(self.conversation_history, f)
def load_from_json(self, filename: str):
"""Load the conversation history from a JSON file
"""Load the conversation history from a JSON file.
Args:
filename (str): filename to load from
filename (str): Filename to load from.
"""
# Load the conversation history from a JSON file
if filename is not None:
with open(filename) as f:
self.conversation_history = json.load(f)
def search_keyword_in_conversation(self, keyword: str):
"""Search for a keyword in the conversation history
"""Search for a keyword in the conversation history.
Args:
keyword (str): keyword to search for
keyword (str): Keyword to search for.
Returns:
str: description
list: List of messages containing the keyword.
"""
return [
msg
@ -299,10 +311,10 @@ class Conversation(BaseStructure):
]
def pretty_print_conversation(self, messages):
"""Pretty print the conversation history
"""Pretty print the conversation history.
Args:
messages (str): messages to print
messages (list): List of messages to print.
"""
role_to_color = {
"system": "red",
@ -383,15 +395,31 @@ class Conversation(BaseStructure):
self.conversation_history = truncated_history
def clear(self):
"""Clear the conversation history."""
self.conversation_history = []
def to_json(self):
"""Convert the conversation history to a JSON string.
Returns:
str: The conversation history as a JSON string.
"""
return json.dumps(self.conversation_history)
def to_dict(self):
"""Convert the conversation history to a dictionary.
Returns:
list: The conversation history as a list of dictionaries.
"""
return self.conversation_history
def to_yaml(self):
"""Convert the conversation history to a YAML string.
Returns:
str: The conversation history as a YAML string.
"""
return yaml.dump(self.conversation_history)
def get_visible_messages(self, agent: "Agent", turn: int):
@ -422,17 +450,30 @@ class Conversation(BaseStructure):
return visible_messages
def get_last_message_as_string(self):
# fetch the last message from the conversation history with the agent name and the message of the agent
"""Fetch the last message from the conversation history.
Returns:
str: The last message formatted as 'role: content'.
"""
return f"{self.conversation_history[-1]['role']}: {self.conversation_history[-1]['content']}"
def return_messages_as_list(self):
# we must concat the role and the content of the message
"""Return the conversation messages as a list of formatted strings.
Returns:
list: List of messages formatted as 'role: content'.
"""
return [
f"{message['role']}: {message['content']}"
for message in self.conversation_history
]
def return_messages_as_dictionary(self):
"""Return the conversation messages as a list of dictionaries.
Returns:
list: List of dictionaries containing role and content of each message.
"""
return [
{
"role": message["role"],
@ -443,15 +484,32 @@ class Conversation(BaseStructure):
def add_tool_output_to_agent(self, role: str, tool_output: dict):
"""
Add a tool output to the conversation history
Add a tool output to the conversation history.
Args:
role (str): The role of the tool.
tool_output (dict): The output from the tool to be added.
"""
self.add(role, tool_output)
def return_json(self):
"""Return the conversation messages as a JSON string.
Returns:
str: The conversation messages formatted as a JSON string.
"""
return json.dumps(
self.return_messages_as_dictionary(), indent=4
)
def get_final_message(self):
"""Return the final message from the conversation history.
Returns:
str: The final message formatted as 'role: content'.
"""
return f"{self.conversation_history[-1]['role']}: {self.conversation_history[-1]['content']}"
# # Example usage
# # conversation = Conversation()

@ -0,0 +1,461 @@
import asyncio
import concurrent.futures
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Tuple
import aiohttp
from dotenv import load_dotenv
from rich.console import Console
from swarms.agents.reasoning_duo import ReasoningDuo
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.any_to_str import any_to_str
from swarms.utils.formatter import formatter
from swarms.utils.history_output_formatter import (
history_output_formatter,
)
from swarms.utils.str_to_dict import str_to_dict
console = Console()
load_dotenv()
# Number of worker threads for concurrent operations
MAX_WORKERS = (
os.cpu_count() * 2
) # Optimal number of workers based on CPU cores
###############################################################################
# 1. System Prompts for Each Scientist Agent
###############################################################################
def format_exa_results(json_data: Dict[str, Any]) -> str:
"""Formats Exa.ai search results into structured text"""
if "error" in json_data:
return f"### Error\n{json_data['error']}\n"
# Pre-allocate formatted_text list with initial capacity
formatted_text = []
# Extract search metadata
search_params = json_data.get("effectiveFilters", {})
query = search_params.get("query", "General web search")
formatted_text.append(
f"### Exa Search Results for: '{query}'\n\n---\n"
)
# Process results
results = json_data.get("results", [])
if not results:
formatted_text.append("No results found.\n")
return "".join(formatted_text)
def process_result(
result: Dict[str, Any], index: int
) -> List[str]:
"""Process a single result in a thread-safe manner"""
title = result.get("title", "No title")
url = result.get("url", result.get("id", "No URL"))
published_date = result.get("publishedDate", "")
# Handle highlights efficiently
highlights = result.get("highlights", [])
highlight_text = (
"\n".join(
(
h.get("text", str(h))
if isinstance(h, dict)
else str(h)
)
for h in highlights[:3]
)
if highlights
else "No summary available"
)
return [
f"{index}. **{title}**\n",
f" - URL: {url}\n",
f" - Published: {published_date.split('T')[0] if published_date else 'Date unknown'}\n",
f" - Key Points:\n {highlight_text}\n\n",
]
# Process results concurrently
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_result = {
executor.submit(process_result, result, i + 1): i
for i, result in enumerate(results)
}
# Collect results in order
processed_results = [None] * len(results)
for future in as_completed(future_to_result):
idx = future_to_result[future]
try:
processed_results[idx] = future.result()
except Exception as e:
console.print(
f"[bold red]Error processing result {idx + 1}: {str(e)}[/bold red]"
)
processed_results[idx] = [
f"Error processing result {idx + 1}: {str(e)}\n"
]
# Extend formatted text with processed results in correct order
for result_text in processed_results:
formatted_text.extend(result_text)
return "".join(formatted_text)
async def _async_exa_search(
query: str, **kwargs: Any
) -> Dict[str, Any]:
"""Asynchronous helper function for Exa.ai API requests"""
api_url = "https://api.exa.ai/search"
headers = {
"x-api-key": os.getenv("EXA_API_KEY"),
"Content-Type": "application/json",
}
payload = {
"query": query,
"useAutoprompt": True,
"numResults": kwargs.get("num_results", 10),
"contents": {
"text": True,
"highlights": {"numSentences": 2},
},
**kwargs,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
api_url, json=payload, headers=headers
) as response:
if response.status != 200:
return {
"error": f"HTTP {response.status}: {await response.text()}"
}
return await response.json()
except Exception as e:
return {"error": str(e)}
def exa_search(query: str, **kwargs: Any) -> str:
"""Performs web search using Exa.ai API with concurrent processing"""
try:
# Run async search in the event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
response_json = loop.run_until_complete(
_async_exa_search(query, **kwargs)
)
finally:
loop.close()
# Format results concurrently
formatted_text = format_exa_results(response_json)
return formatted_text
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
console.print(f"[bold red]{error_msg}[/bold red]")
return error_msg
# Define the research tools schema
tools = [
{
"type": "function",
"function": {
"name": "search_topic",
"description": "Conduct an in-depth search on a specified topic or subtopic, generating a comprehensive array of highly detailed search queries tailored to the input parameters.",
"parameters": {
"type": "object",
"properties": {
"depth": {
"type": "integer",
"description": "Indicates the level of thoroughness for the search. Values range from 1 to 3, where 1 represents a superficial search and 3 signifies an exploration of the topic.",
},
"detailed_queries": {
"type": "array",
"description": "An array of highly specific search queries that are generated based on the input query and the specified depth. Each query should be designed to elicit detailed and relevant information from various sources.",
"items": {
"type": "string",
"description": "Each item in this array should represent a unique search query that targets a specific aspect of the main topic, ensuring a comprehensive exploration of the subject matter.",
},
},
},
"required": ["depth", "detailed_queries"],
},
},
},
]
RESEARCH_AGENT_PROMPT = """
You are an advanced research agent specialized in conducting deep, comprehensive research across multiple domains.
Your task is to:
1. Break down complex topics into searchable subtopics
2. Generate diverse search queries to explore each subtopic thoroughly
3. Identify connections and patterns across different areas of research
4. Synthesize findings into coherent insights
5. Identify gaps in current knowledge and suggest areas for further investigation
For each research task:
- Consider multiple perspectives and approaches
- Look for both supporting and contradicting evidence
- Evaluate the credibility and relevance of sources
- Track emerging trends and recent developments
- Consider cross-disciplinary implications
Output Format:
- Provide structured research plans
- Include specific search queries for each subtopic
- Prioritize queries based on relevance and potential impact
- Suggest follow-up areas for deeper investigation
"""
SUMMARIZATION_AGENT_PROMPT = """
You are an expert information synthesis and summarization agent designed for producing clear, accurate, and insightful summaries of complex information. Your core capabilities include:
Core Capabilities:
- Identify and extract key concepts, themes, and insights from any given content
- Recognize patterns, relationships, and hierarchies within information
- Filter out noise while preserving crucial context and nuance
- Handle multiple sources and perspectives simultaneously
Summarization Strategy
1. Multi-level Structure
- Provide an extensive summary
- Follow with key findings
- Include detailed insights with supporting evidence
- End with implications or next steps when relevant
2. Quality Standards
- Maintain factual accuracy and precision
- Preserve important technical details and terminology
- Avoid oversimplification of complex concepts
- Include quantitative data when available
- Cite or reference specific sources when summarizing claims
3. Clarity & Accessibility
- Use clear, concise language
- Define technical terms when necessary
- Structure information logically
- Use formatting to enhance readability
- Maintain appropriate level of technical depth for the audience
4. Synthesis & Analysis
- Identify conflicting information or viewpoints
- Highlight consensus across sources
- Note gaps or limitations in the information
- Draw connections between related concepts
- Provide context for better understanding
OUTPUT REQUIREMENTS:
- Begin with a clear statement of the topic or question being addressed
- Use consistent formatting and structure
- Clearly separate different levels of detail
- Include confidence levels for conclusions when appropriate
- Note any areas requiring additional research or clarification
Remember: Your goal is to make complex information accessible while maintaining accuracy and depth. Prioritize clarity without sacrificing important nuance or detail."""
# Initialize the research agent
research_agent = Agent(
agent_name="Deep-Research-Agent",
agent_description="Specialized agent for conducting comprehensive research across multiple domains",
system_prompt=RESEARCH_AGENT_PROMPT,
max_loops=1, # Allow multiple iterations for thorough research
tools_list_dictionary=tools,
)
reasoning_duo = ReasoningDuo(
system_prompt=SUMMARIZATION_AGENT_PROMPT, output_type="string"
)
class DeepResearchSwarm:
def __init__(
self,
name: str = "DeepResearchSwarm",
description: str = "A swarm that conducts comprehensive research across multiple domains",
research_agent: Agent = research_agent,
max_loops: int = 1,
nice_print: bool = True,
output_type: str = "json",
max_workers: int = os.cpu_count()
* 2, # Let the system decide optimal thread count
token_count: bool = False,
):
self.name = name
self.description = description
self.research_agent = research_agent
self.max_loops = max_loops
self.nice_print = nice_print
self.output_type = output_type
self.max_workers = max_workers
self.reliability_check()
self.conversation = Conversation(token_count=token_count)
# Create a persistent ThreadPoolExecutor for the lifetime of the swarm
# This eliminates thread creation overhead on each query
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self.max_workers
)
def __del__(self):
"""Clean up the executor on object destruction"""
self.executor.shutdown(wait=False)
def reliability_check(self):
"""Check the reliability of the query"""
if self.max_loops < 1:
raise ValueError("max_loops must be greater than 0")
formatter.print_panel(
"DeepResearchSwarm is booting up...", "blue"
)
formatter.print_panel("Reliability check passed", "green")
def get_queries(self, query: str) -> List[str]:
"""
Generate a list of detailed search queries based on the input query.
Args:
query (str): The main research query to explore
Returns:
List[str]: A list of detailed search queries
"""
self.conversation.add(role="User", content=query)
# Get the agent's response
agent_output = self.research_agent.run(query)
self.conversation.add(
role=self.research_agent.agent_name, content=agent_output
)
# Convert the string output to dictionary
output_dict = str_to_dict(agent_output)
# Print the conversation history
if self.nice_print:
to_do_list = any_to_str(output_dict)
formatter.print_panel(to_do_list, "blue")
# Extract the detailed queries from the output
if (
isinstance(output_dict, dict)
and "detailed_queries" in output_dict
):
queries = output_dict["detailed_queries"]
formatter.print_panel(
f"Generated {len(queries)} queries", "blue"
)
return queries
return []
def _process_query(self, query: str) -> Tuple[str, str]:
"""
Process a single query with search and reasoning.
This function is designed to be run in a separate thread.
Args:
query (str): The query to process
Returns:
Tuple[str, str]: A tuple containing (search_results, reasoning_output)
"""
# Run the search
results = exa_search(query)
# Run the reasoning on the search results
reasoning_output = reasoning_duo.run(results)
return (results, reasoning_output)
def step(self, query: str):
"""
Execute a single research step with maximum parallelism.
Args:
query (str): The research query to process
Returns:
Formatted conversation history
"""
# Get all the queries to process
queries = self.get_queries(query)
# Submit all queries for concurrent processing
# Using a list instead of generator for clearer debugging
futures = []
for q in queries:
future = self.executor.submit(self._process_query, q)
futures.append((q, future))
# Process results as they complete (no waiting for slower queries)
for q, future in futures:
try:
# Get results (blocks until this specific future is done)
results, reasoning_output = future.result()
# Add search results to conversation
self.conversation.add(
role="User",
content=f"Search results for {q}: \n {results}",
)
# Add reasoning output to conversation
self.conversation.add(
role=reasoning_duo.agent_name,
content=reasoning_output,
)
except Exception as e:
# Handle any errors in the thread
self.conversation.add(
role="System",
content=f"Error processing query '{q}': {str(e)}",
)
# Once all query processing is complete, generate the final summary
# This step runs after all queries to ensure it summarizes all results
final_summary = reasoning_duo.run(
f"Generate an extensive report of the following content: {self.conversation.get_str()}"
)
self.conversation.add(
role=reasoning_duo.agent_name,
content=final_summary,
)
return history_output_formatter(
self.conversation, type=self.output_type
)
# # Example usage
# if __name__ == "__main__":
# swarm = DeepResearchSwarm(
# output_type="json",
# )
# print(
# swarm.step(
# "What is the active tarrif situation with mexico? Only create 2 queries"
# )
# )

@ -236,7 +236,7 @@ class GroupChat:
self.agents = agents
self.speaker_fn = speaker_fn
self.max_loops = max_loops
self.conversation = Conversation(time_enabled=True)
self.conversation = Conversation(time_enabled=False)
self.rules = rules
self.reliability_check()

@ -257,7 +257,7 @@ class HierarchicalSwarm(BaseSwarm):
self.return_all_history = return_all_history
self.output_type = output_type
self.director_model_name = director_model_name
self.conversation = Conversation(time_enabled=True)
self.conversation = Conversation(time_enabled=False)
self.current_loop = 0
self.agent_outputs = {} # Store agent outputs for each loop

@ -163,7 +163,7 @@ class MajorityVoting:
self.output_type = output_type
self.conversation = Conversation(
time_enabled=True, *args, **kwargs
time_enabled=False, *args, **kwargs
)
self.initialize_majority_voting()

@ -153,7 +153,7 @@ class MultiAgentCollaboration(BaseSwarm):
# Conversation
self.conversation = Conversation(
time_enabled=True, *args, **kwargs
time_enabled=False, *args, **kwargs
)
def default_select_next_speaker(

@ -11,4 +11,6 @@ OutputType = Literal[
".txt",
".yaml",
".toml",
"string",
"str",
]

@ -21,6 +21,7 @@ from swarms.structs.swarm_matcher import swarm_matcher
from swarms.structs.output_type import OutputType
from swarms.utils.loguru_logger import initialize_logger
from swarms.structs.malt import MALT
from swarms.structs.deep_research_swarm import DeepResearchSwarm
logger = initialize_logger(log_folder="swarm_router")
@ -37,6 +38,7 @@ SwarmType = Literal[
"auto",
"MajorityVoting",
"MALT",
"DeepResearchSwarm",
]
@ -316,6 +318,15 @@ class SwarmRouter:
preset_agents=True,
)
elif self.swarm_type == "DeepResearchSwarm":
return DeepResearchSwarm(
name=self.name,
description=self.description,
agents=self.agents,
max_loops=self.max_loops,
output_type=self.output_type,
)
elif self.swarm_type == "HiearchicalSwarm":
return HierarchicalSwarm(
name=self.name,

@ -142,7 +142,7 @@ class SwarmsAPIClient:
def __init__(
self,
api_key: Optional[str] = None,
base_url: str = "https://swarms-api-285321057562.us-east1.run.app",
base_url: str = "https://api.swarms.world",
timeout: int = 30,
max_retries: int = 3,
format_type: Literal["pydantic", "json", "dict"] = "pydantic",

@ -282,7 +282,7 @@ class ForestSwarm:
self.shared_memory = shared_memory
self.save_file_path = f"forest_swarm_{uuid.uuid4().hex}.json"
self.conversation = Conversation(
time_enabled=True,
time_enabled=False,
auto_save=True,
save_filepath=self.save_file_path,
rules=rules,

@ -10,5 +10,9 @@ def history_output_formatter(
return conversation.to_dict()
elif type == "string" or type == "str":
return conversation.get_str()
elif type == "final":
return conversation.get_final_message()
elif type == "json":
return conversation.to_json()
else:
raise ValueError(f"Invalid type: {type}")

@ -1,5 +1,11 @@
import asyncio
from typing import List
from loguru import logger
try:
from litellm import completion
from litellm import completion, acompletion
except ImportError:
import subprocess
import sys
@ -33,6 +39,9 @@ class LiteLLM:
max_tokens: int = 4000,
ssl_verify: bool = False,
max_completion_tokens: int = 4000,
tools_list_dictionary: List[dict] = None,
tool_choice: str = "auto",
parallel_tool_calls: bool = False,
*args,
**kwargs,
):
@ -53,8 +62,9 @@ class LiteLLM:
self.max_tokens = max_tokens
self.ssl_verify = ssl_verify
self.max_completion_tokens = max_completion_tokens
self.max_completion_tokens = max_tokens
self.tools_list_dictionary = tools_list_dictionary
self.tool_choice = tool_choice
self.parallel_tool_calls = parallel_tool_calls
def _prepare_messages(self, task: str) -> list:
"""
@ -77,7 +87,7 @@ class LiteLLM:
return messages
def run(self, task: str, tools: list = [], *args, **kwargs):
def run(self, task: str, *args, **kwargs):
"""
Run the LLM model for the given task.
@ -93,23 +103,45 @@ class LiteLLM:
messages = self._prepare_messages(task)
response = completion(
model=self.model_name,
messages=messages,
stream=self.stream,
temperature=self.temperature,
max_tokens=self.max_tokens,
*args,
**kwargs,
)
content = response.choices[
0
].message.content # Accessing the content
return content
if self.tools_list_dictionary is not None:
response = completion(
model=self.model_name,
messages=messages,
stream=self.stream,
temperature=self.temperature,
max_tokens=self.max_tokens,
tools=self.tools_list_dictionary,
tool_choice=self.tool_choice,
parallel_tool_calls=self.parallel_tool_calls,
*args,
**kwargs,
)
return (
response.choices[0]
.message.tool_calls[0]
.function.arguments
)
else:
response = completion(
model=self.model_name,
messages=messages,
stream=self.stream,
temperature=self.temperature,
max_tokens=self.max_tokens,
*args,
**kwargs,
)
content = response.choices[
0
].message.content # Accessing the content
return content
except Exception as error:
print(error)
logger.error(f"Error in LiteLLM: {error}")
raise error
def __call__(self, task: str, *args, **kwargs):
"""
@ -124,3 +156,88 @@ class LiteLLM:
str: The content of the response from the model.
"""
return self.run(task, *args, **kwargs)
async def arun(self, task: str, *args, **kwargs):
"""
Run the LLM model for the given task.
Args:
task (str): The task to run the model for.
*args: Additional positional arguments to pass to the model.
**kwargs: Additional keyword arguments to pass to the model.
Returns:
str: The content of the response from the model.
"""
try:
messages = self._prepare_messages(task)
if self.tools_list_dictionary is not None:
response = await acompletion(
model=self.model_name,
messages=messages,
stream=self.stream,
temperature=self.temperature,
max_tokens=self.max_tokens,
tools=self.tools_list_dictionary,
tool_choice=self.tool_choice,
parallel_tool_calls=self.parallel_tool_calls,
*args,
**kwargs,
)
content = (
response.choices[0]
.message.tool_calls[0]
.function.arguments
)
# return response
else:
response = await acompletion(
model=self.model_name,
messages=messages,
stream=self.stream,
temperature=self.temperature,
max_tokens=self.max_tokens,
*args,
**kwargs,
)
content = response.choices[
0
].message.content # Accessing the content
return content
except Exception as error:
logger.error(f"Error in LiteLLM: {error}")
raise error
def batched_run(self, tasks: List[str], batch_size: int = 10):
"""
Run the LLM model for the given tasks in batches.
"""
logger.info(
f"Running tasks in batches of size {batch_size}. Total tasks: {len(tasks)}"
)
results = []
for task in tasks:
logger.info(f"Running task: {task}")
results.append(self.run(task))
logger.info("Completed all tasks.")
return results
def batched_arun(self, tasks: List[str], batch_size: int = 10):
"""
Run the LLM model for the given tasks in batches.
"""
logger.info(
f"Running asynchronous tasks in batches of size {batch_size}. Total tasks: {len(tasks)}"
)
results = []
for task in tasks:
logger.info(f"Running asynchronous task: {task}")
results.append(asyncio.run(self.arun(task)))
logger.info("Completed all asynchronous tasks.")
return results

@ -0,0 +1,27 @@
import json
from typing import Dict
def str_to_dict(s: str, retries: int = 3) -> Dict:
"""
Converts a JSON string to dictionary.
Args:
s (str): The JSON string to be converted.
retries (int): The number of times to retry parsing the string in case of a JSONDecodeError. Default is 3.
Returns:
Dict: The parsed dictionary from the JSON string.
Raises:
json.JSONDecodeError: If the string cannot be parsed into a dictionary after the specified number of retries.
"""
for attempt in range(retries):
try:
# Run json.loads directly since it's fast enough
return json.loads(s)
except json.JSONDecodeError as e:
if attempt < retries - 1:
continue # Retry on failure
else:
raise e # Raise the error if all retries fail

@ -0,0 +1,63 @@
# 🚀 Latest Updates to Swarms - Twitter Thread
🧵 Thread: Exciting new features in Swarms - the powerful AI agent framework! #AI #AGI #Development
1/9 🤖 Introducing the ReasoningDuo - a revolutionary dual-agent system that combines reasoning and execution agents for more robust and reliable outputs! Perfect for complex problem-solving tasks.
2/9 🔄 New Self-Consistency Agent with parallel processing:
- Generates multiple independent responses
- Uses ThreadPoolExecutor for concurrent execution
- Aggregates results for higher accuracy
#AI #ParallelProcessing
3/9 🎯 The ReasoningAgentRouter is here! Dynamically select and execute different reasoning strategies:
- ReasoningDuo
- Self-Consistency
- Iterative Reflective Expansion (IRE)
#AIAgents
4/9 💡 Advanced Reasoning Capabilities:
- Structured problem analysis
- Multiple solution exploration
- Bias detection and transparency
- Error handling strategies
#ArtificialIntelligence
5/9 ⚡️ Performance Improvements:
- Concurrent response generation
- Batched task processing
- Optimized thread management
- Improved error handling
#Performance
6/9 🎛️ Customization Options:
- Adjustable sample sizes
- Flexible model selection
- Customizable system prompts
- Multiple output formats
#Flexibility
7/9 🔍 Enhanced Evaluation Features:
- Response validation
- Answer checking
- Majority voting system
- Comprehensive logging
#QualityAssurance
8/9 📊 New Output Types:
- Dictionary format
- List format
- Conversation history
- Structured analysis
#DataScience
9/9 🌟 Coming Soon:
- More agent types
- Enhanced routing strategies
- Advanced aggregation methods
- Expanded model support
Stay tuned! #FutureOfAI
---
Follow for more updates on Swarms! 🚀
#AI #MachineLearning #AGI #Development

@ -20,7 +20,7 @@ def test_add_message():
def test_add_message_with_time():
conv = Conversation(time_enabled=True)
conv = Conversation(time_enabled=False)
conv.add("user", "Hello, world!")
assert len(conv.conversation_history) == 1
assert conv.conversation_history[0]["role"] == "user"

@ -0,0 +1,70 @@
from litellm import completion
from dotenv import load_dotenv
load_dotenv()
## [OPTIONAL] REGISTER MODEL - not all ollama models support function calling, litellm defaults to json mode tool calls if native tool calling not supported.
# litellm.register_model(model_cost={
# "ollama_chat/llama3.1": {
# "supports_function_calling": true
# },
# })
tools = [
{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Retrieve detailed current weather information for a specified location, including temperature, humidity, wind speed, and atmospheric conditions.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA, or a specific geographic coordinate in the format 'latitude,longitude'.",
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit", "kelvin"],
"description": "The unit of temperature measurement to be used in the response.",
},
"include_forecast": {
"type": "boolean",
"description": "Indicates whether to include a short-term weather forecast along with the current conditions.",
},
"time": {
"type": "string",
"format": "date-time",
"description": "Optional parameter to specify the time for which the weather data is requested, in ISO 8601 format.",
},
},
"required": [
"location",
"unit",
"include_forecast",
"time",
],
},
},
}
]
messages = [
{
"role": "user",
"content": "What's the weather like in Boston today?",
}
]
response = completion(
model="gpt-4o-mini",
messages=messages,
tools=tools,
tool_choice="auto",
parallel_tool_calls=True,
)
print(response.choices[0].message.tool_calls[0].function.arguments)
print(response.choices[0].message)

@ -0,0 +1,229 @@
import asyncio
import sys
from loguru import logger
from swarms.utils.litellm_wrapper import LiteLLM
# Configure loguru logger
logger.remove() # Remove default handler
logger.add(
"test_litellm.log",
rotation="1 MB",
format="{time} | {level} | {message}",
level="DEBUG",
)
logger.add(sys.stdout, level="INFO")
tools = [
{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Retrieve detailed current weather information for a specified location, including temperature, humidity, wind speed, and atmospheric conditions.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA, or a specific geographic coordinate in the format 'latitude,longitude'.",
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit", "kelvin"],
"description": "The unit of temperature measurement to be used in the response.",
},
"include_forecast": {
"type": "boolean",
"description": "Indicates whether to include a short-term weather forecast along with the current conditions.",
},
"time": {
"type": "string",
"format": "date-time",
"description": "Optional parameter to specify the time for which the weather data is requested, in ISO 8601 format.",
},
},
"required": [
"location",
"unit",
"include_forecast",
"time",
],
},
},
}
]
# Initialize LiteLLM with streaming enabled
llm = LiteLLM(model_name="gpt-4o-mini", tools_list_dictionary=tools)
async def main():
# When streaming is enabled, arun returns a stream of chunks
# We need to handle these chunks directly, not try to access .choices
stream = await llm.arun("What is the weather in San Francisco?")
logger.info(f"Received stream from LLM. {stream}")
if stream is not None:
logger.info(f"Stream is not None. {stream}")
else:
logger.info("Stream is None.")
def run_test_suite():
"""Run all test cases and generate a comprehensive report."""
logger.info("Starting LiteLLM Test Suite")
total_tests = 0
passed_tests = 0
failed_tests = []
def log_test_result(test_name: str, passed: bool, error=None):
nonlocal total_tests, passed_tests
total_tests += 1
if passed:
passed_tests += 1
logger.success(f"{test_name} - PASSED")
else:
failed_tests.append((test_name, error))
logger.error(f"{test_name} - FAILED: {error}")
# Test 1: Basic Initialization
try:
logger.info("Testing basic initialization")
llm = LiteLLM()
assert llm.model_name == "gpt-4o"
assert llm.temperature == 0.5
assert llm.max_tokens == 4000
log_test_result("Basic Initialization", True)
except Exception as e:
log_test_result("Basic Initialization", False, str(e))
# Test 2: Custom Parameters
try:
logger.info("Testing custom parameters")
llm = LiteLLM(
model_name="gpt-3.5-turbo",
temperature=0.7,
max_tokens=2000,
system_prompt="You are a helpful assistant",
)
assert llm.model_name == "gpt-3.5-turbo"
assert llm.temperature == 0.7
assert llm.max_tokens == 2000
assert llm.system_prompt == "You are a helpful assistant"
log_test_result("Custom Parameters", True)
except Exception as e:
log_test_result("Custom Parameters", False, str(e))
# Test 3: Message Preparation
try:
logger.info("Testing message preparation")
llm = LiteLLM(system_prompt="Test system prompt")
messages = llm._prepare_messages("Test task")
assert len(messages) == 2
assert messages[0]["role"] == "system"
assert messages[0]["content"] == "Test system prompt"
assert messages[1]["role"] == "user"
assert messages[1]["content"] == "Test task"
log_test_result("Message Preparation", True)
except Exception as e:
log_test_result("Message Preparation", False, str(e))
# Test 4: Basic Completion
try:
logger.info("Testing basic completion")
llm = LiteLLM()
response = llm.run("What is 2+2?")
assert isinstance(response, str)
assert len(response) > 0
log_test_result("Basic Completion", True)
except Exception as e:
log_test_result("Basic Completion", False, str(e))
try:
# tool usage
asyncio.run(main())
except Exception as e:
log_test_result("Tool Usage", False, str(e))
# Test 5: Tool Calling
try:
logger.info("Testing tool calling")
tools = [
{
"type": "function",
"function": {
"name": "test_function",
"description": "A test function",
"parameters": {
"type": "object",
"properties": {"test": {"type": "string"}},
},
},
}
]
llm = LiteLLM(
tools_list_dictionary=tools,
tool_choice="auto",
model_name="gpt-4o-mini",
)
assert llm.tools_list_dictionary == tools
assert llm.tool_choice == "auto"
log_test_result("Tool Calling Setup", True)
except Exception as e:
log_test_result("Tool Calling Setup", False, str(e))
# Test 6: Async Completion
async def test_async():
try:
logger.info("Testing async completion")
llm = LiteLLM()
response = await llm.arun("What is 3+3?")
assert isinstance(response, str)
assert len(response) > 0
log_test_result("Async Completion", True)
except Exception as e:
log_test_result("Async Completion", False, str(e))
asyncio.run(test_async())
# Test 7: Batched Run
try:
logger.info("Testing batched run")
llm = LiteLLM()
tasks = ["Task 1", "Task 2", "Task 3"]
responses = llm.batched_run(tasks, batch_size=2)
assert isinstance(responses, list)
assert len(responses) == 3
log_test_result("Batched Run", True)
except Exception as e:
log_test_result("Batched Run", False, str(e))
# Generate test report
success_rate = (passed_tests / total_tests) * 100
logger.info("\n=== Test Suite Report ===")
logger.info(f"Total Tests: {total_tests}")
logger.info(f"Passed Tests: {passed_tests}")
logger.info(f"Failed Tests: {len(failed_tests)}")
logger.info(f"Success Rate: {success_rate:.2f}%")
if failed_tests:
logger.error("\nFailed Tests Details:")
for test_name, error in failed_tests:
logger.error(f"{test_name}: {error}")
return {
"total_tests": total_tests,
"passed_tests": passed_tests,
"failed_tests": failed_tests,
"success_rate": success_rate,
}
if __name__ == "__main__":
test_results = run_test_suite()
logger.info(
"Test suite completed. Check test_litellm.log for detailed logs."
)

@ -0,0 +1,142 @@
import os
import requests
from dotenv import load_dotenv
import json
load_dotenv()
# Retrieve API key securely from .env
API_KEY = os.getenv("SWARMS_API_KEY")
BASE_URL = "https://api.swarms.world"
# Headers for secure API communication
headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}
def create_medical_swarm(patient_case: str):
"""
Constructs and triggers a full-stack medical swarm consisting of three agents:
Diagnostic Specialist, Medical Coder, and Treatment Advisor.
Each agent is provided with a comprehensive, detailed system prompt to ensure high reliability.
"""
payload = {
"swarm_name": "Enhanced Medical Diagnostic Swarm",
"description": "A swarm of agents specialized in performing comprehensive medical diagnostics, analysis, and coding.",
"agents": [
{
"agent_name": "Diagnostic Specialist",
"description": "Agent specialized in analyzing patient history, symptoms, lab results, and imaging data to produce accurate diagnoses.",
"system_prompt": (
"You are an experienced, board-certified medical diagnostician with over 20 years of clinical practice. "
"Your role is to analyze all available patient information—including history, symptoms, lab tests, and imaging results—"
"with extreme attention to detail and clinical nuance. Provide a comprehensive differential diagnosis considering "
"common, uncommon, and rare conditions. Always cross-reference clinical guidelines and evidence-based medicine. "
"Explain your reasoning step by step and provide a final prioritized list of potential diagnoses along with their likelihood. "
"Consider patient demographics, comorbidities, and risk factors. Your diagnosis should be reliable, clear, and actionable."
),
"model_name": "openai/gpt-4o",
"role": "worker",
"max_loops": 2,
"max_tokens": 4000,
"temperature": 0.3,
"auto_generate_prompt": False,
"tools_dictionary": [
{
"type": "function",
"function": {
"name": "search_topic",
"description": "Conduct an in-depth search on a specified topic or subtopic, generating a comprehensive array of highly detailed search queries tailored to the input parameters.",
"parameters": {
"type": "object",
"properties": {
"depth": {
"type": "integer",
"description": "Indicates the level of thoroughness for the search. Values range from 1 to 3, where 1 represents a superficial search and 3 signifies an exploration of the topic.",
},
"detailed_queries": {
"type": "array",
"description": "An array of highly specific search queries that are generated based on the input query and the specified depth. Each query should be designed to elicit detailed and relevant information from various sources.",
"items": {
"type": "string",
"description": "Each item in this array should represent a unique search query that targets a specific aspect of the main topic, ensuring a comprehensive exploration of the subject matter.",
},
},
},
"required": [
"depth",
"detailed_queries",
],
},
},
},
],
},
{
"agent_name": "Medical Coder",
"description": "Agent responsible for translating medical diagnoses and procedures into accurate standardized medical codes (ICD-10, CPT, etc.).",
"system_prompt": (
"You are a certified and experienced medical coder, well-versed in ICD-10, CPT, and other coding systems. "
"Your task is to convert detailed medical diagnoses and treatment procedures into precise, standardized codes. "
"Consider all aspects of the clinical documentation including severity, complications, and comorbidities. "
"Provide clear explanations for the codes chosen, referencing the latest coding guidelines and payer policies where relevant. "
"Your output should be comprehensive, reliable, and fully compliant with current medical coding standards."
),
"model_name": "openai/gpt-4o",
"role": "worker",
"max_loops": 1,
"max_tokens": 3000,
"temperature": 0.2,
"auto_generate_prompt": False,
"tools_dictionary": [],
},
{
"agent_name": "Treatment Advisor",
"description": "Agent dedicated to suggesting evidence-based treatment options, including pharmaceutical and non-pharmaceutical interventions.",
"system_prompt": (
"You are a highly knowledgeable medical treatment specialist with expertise in the latest clinical guidelines and research. "
"Based on the diagnostic conclusions provided, your task is to recommend a comprehensive treatment plan. "
"Your suggestions should include first-line therapies, potential alternative treatments, and considerations for patient-specific factors "
"such as allergies, contraindications, and comorbidities. Explain the rationale behind each treatment option and reference clinical guidelines where applicable. "
"Your recommendations should be reliable, detailed, and clearly prioritized based on efficacy and safety."
),
"model_name": "openai/gpt-4o",
"role": "worker",
"max_loops": 1,
"max_tokens": 5000,
"temperature": 0.3,
"auto_generate_prompt": False,
"tools_dictionary": [],
},
],
"max_loops": 3,
"swarm_type": "SequentialWorkflow",
"task": patient_case,
}
# Payload includes the patient case as the task to be processed by the swarm
response = requests.post(
f"{BASE_URL}/v1/swarm/completions",
headers=headers,
json=payload,
)
if response.status_code == 200:
print("Swarm successfully executed!")
return json.dumps(response.json(), indent=4)
else:
print(f"Error {response.status_code}: {response.text}")
return None
# Example Patient Task for the Swarm to diagnose and analyze
if __name__ == "__main__":
patient_case = (
"Patient is a 55-year-old male presenting with severe chest pain, shortness of breath, elevated blood pressure, "
"nausea, and a family history of cardiovascular disease. Blood tests show elevated troponin levels, and EKG indicates ST-segment elevations. "
"The patient is currently unstable. Provide a detailed diagnosis, coding, and treatment plan."
)
diagnostic_output = create_medical_swarm(patient_case)
print(diagnostic_output)
Loading…
Cancel
Save