[DELETE OLD PACKAGES]

pull/642/head
Your Name 1 month ago
parent b6ca99b513
commit 73d23f1995

@ -0,0 +1,56 @@
import os
from dotenv import load_dotenv
from swarm_models import OpenAIChat
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from async_executor import HighSpeedExecutor
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
llm=model,
max_loops=1,
# autosave=True,
# dashboard=False,
# verbose=True,
# dynamic_temperature_enabled=True,
# saved_state_path="finance_agent.json",
# user_name="swarms_corp",
# retry_attempts=1,
# context_length=200000,
# return_step_meta=True,
# output_type="json", # "json", "dict", "csv" OR "string" soon "yaml" and
# auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
# # artifacts_on=True,
# artifacts_output_path="roth_ira_report",
# artifacts_file_extension=".txt",
# max_tokens=8000,
# return_history=True,
)
def execute_agent(
task: str = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria. Create a report on this question.",
):
return agent.run(task)
executor = HighSpeedExecutor()
results = executor.run(execute_agent, 2)
print(results)

@ -0,0 +1,131 @@
import asyncio
import multiprocessing as mp
import time
from functools import partial
from typing import Any, Dict, Union
class HighSpeedExecutor:
def __init__(self, num_processes: int = None):
"""
Initialize the executor with configurable number of processes.
If num_processes is None, it uses CPU count.
"""
self.num_processes = num_processes or mp.cpu_count()
async def _worker(
self,
queue: asyncio.Queue,
func: Any,
*args: Any,
**kwargs: Any,
):
"""Async worker that processes tasks from the queue"""
while True:
try:
# Non-blocking get from queue
await queue.get()
await asyncio.get_event_loop().run_in_executor(
None, partial(func, *args, **kwargs)
)
queue.task_done()
except asyncio.CancelledError:
break
async def _distribute_tasks(
self, num_tasks: int, queue: asyncio.Queue
):
"""Distribute tasks across the queue"""
for i in range(num_tasks):
await queue.put(i)
async def execute_batch(
self,
func: Any,
num_executions: int,
*args: Any,
**kwargs: Any,
) -> Dict[str, Union[int, float]]:
"""
Execute the given function multiple times concurrently.
Args:
func: The function to execute
num_executions: Number of times to execute the function
*args, **kwargs: Arguments to pass to the function
Returns:
A dictionary containing the number of executions, duration, and executions per second.
"""
queue = asyncio.Queue()
# Create worker tasks
workers = [
asyncio.create_task(
self._worker(queue, func, *args, **kwargs)
)
for _ in range(self.num_processes)
]
# Start timing
start_time = time.perf_counter()
# Distribute tasks
await self._distribute_tasks(num_executions, queue)
# Wait for all tasks to complete
await queue.join()
# Cancel workers
for worker in workers:
worker.cancel()
# Wait for all workers to finish
await asyncio.gather(*workers, return_exceptions=True)
end_time = time.perf_counter()
duration = end_time - start_time
return {
"executions": num_executions,
"duration": duration,
"executions_per_second": num_executions / duration,
}
def run(
self,
func: Any,
num_executions: int,
*args: Any,
**kwargs: Any,
):
return asyncio.run(
self.execute_batch(func, num_executions, *args, **kwargs)
)
# def example_function(x: int = 0) -> int:
# """Example function to execute"""
# return x * x
# async def main():
# # Create executor with number of CPU cores
# executor = HighSpeedExecutor()
# # Execute the function 1000 times
# result = await executor.execute_batch(
# example_function, num_executions=1000, x=42
# )
# print(
# f"Completed {result['executions']} executions in {result['duration']:.2f} seconds"
# )
# print(
# f"Rate: {result['executions_per_second']:.2f} executions/second"
# )
# if __name__ == "__main__":
# # Run the async main function
# asyncio.run(main())

@ -0,0 +1,244 @@
import os
import asyncio
from pydantic import BaseModel, Field
from typing import List, Dict, Any
from swarms import Agent
from swarm_models import OpenAIChat
from dotenv import load_dotenv
from swarms.utils.formatter import formatter
# Load environment variables
load_dotenv()
# Get OpenAI API key
api_key = os.getenv("OPENAI_API_KEY")
# Define Pydantic schema for agent outputs
class AgentOutput(BaseModel):
"""Schema for capturing the output of each agent."""
agent_name: str = Field(..., description="The name of the agent")
message: str = Field(
...,
description="The agent's response or contribution to the group chat",
)
metadata: Dict[str, Any] = Field(
default_factory=dict,
description="Additional metadata about the agent's response",
)
class GroupChat:
"""
GroupChat class to enable multiple agents to communicate in an asynchronous group chat.
Each agent is aware of all other agents, every message exchanged, and the social context.
"""
def __init__(
self,
name: str,
description: str,
agents: List[Agent],
max_loops: int = 1,
):
"""
Initialize the GroupChat.
Args:
name (str): Name of the group chat.
description (str): Description of the purpose of the group chat.
agents (List[Agent]): A list of agents participating in the chat.
max_loops (int): Maximum number of loops to run through all agents.
"""
self.name = name
self.description = description
self.agents = agents
self.max_loops = max_loops
self.chat_history = (
[]
) # Stores all messages exchanged in the chat
formatter.print_panel(
f"Initialized GroupChat '{self.name}' with {len(self.agents)} agents. Max loops: {self.max_loops}",
title="Groupchat Swarm",
)
async def _agent_conversation(
self, agent: Agent, input_message: str
) -> AgentOutput:
"""
Facilitate a single agent's response to the chat.
Args:
agent (Agent): The agent responding.
input_message (str): The message triggering the response.
Returns:
AgentOutput: The agent's response captured in a structured format.
"""
formatter.print_panel(
f"Agent '{agent.agent_name}' is responding to the message: {input_message}",
title="Groupchat Swarm",
)
response = await asyncio.to_thread(agent.run, input_message)
output = AgentOutput(
agent_name=agent.agent_name,
message=response,
metadata={"context_length": agent.context_length},
)
# logger.debug(f"Agent '{agent.agent_name}' response: {response}")
return output
async def _run(self, initial_message: str) -> List[AgentOutput]:
"""
Execute the group chat asynchronously, looping through all agents up to max_loops.
Args:
initial_message (str): The initial message to start the chat.
Returns:
List[AgentOutput]: The responses of all agents across all loops.
"""
formatter.print_panel(
f"Starting group chat '{self.name}' with initial message: {initial_message}",
title="Groupchat Swarm",
)
self.chat_history.append(
{"sender": "System", "message": initial_message}
)
outputs = []
for loop in range(self.max_loops):
formatter.print_panel(
f"Group chat loop {loop + 1}/{self.max_loops}",
title="Groupchat Swarm",
)
for agent in self.agents:
# Create a custom input message for each agent, sharing the chat history and social context
input_message = (
f"Chat History:\n{self._format_chat_history()}\n\n"
f"Participants:\n"
+ "\n".join(
[
f"- {a.agent_name}: {a.system_prompt}"
for a in self.agents
]
)
+ f"\n\nNew Message: {initial_message}\n\n"
f"You are '{agent.agent_name}'. Remember to keep track of the social context, who is speaking, "
f"and respond accordingly based on your role: {agent.system_prompt}."
)
# Collect agent's response
output = await self._agent_conversation(
agent, input_message
)
outputs.append(output)
# Update chat history with the agent's response
self.chat_history.append(
{
"sender": agent.agent_name,
"message": output.message,
}
)
formatter.print_panel(
"Group chat completed. All agent responses captured.",
title="Groupchat Swarm",
)
return outputs
def run(self, task: str, *args, **kwargs):
return asyncio.run(self.run(task, *args, **kwargs))
def _format_chat_history(self) -> str:
"""
Format the chat history for agents to understand the context.
Returns:
str: The formatted chat history as a string.
"""
return "\n".join(
[
f"{entry['sender']}: {entry['message']}"
for entry in self.chat_history
]
)
def __str__(self) -> str:
"""String representation of the group chat's outputs."""
return self._format_chat_history()
def to_json(self) -> str:
"""JSON representation of the group chat's outputs."""
return [
{"sender": entry["sender"], "message": entry["message"]}
for entry in self.chat_history
]
# Example Usage
if __name__ == "__main__":
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4o-mini",
temperature=0.1,
)
# Example agents
agent1 = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt="You are a financial analyst specializing in investment strategies.",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
output_type="string",
streaming_on=False,
)
agent2 = Agent(
agent_name="Tax-Adviser-Agent",
system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.",
llm=model,
max_loops=1,
autosave=False,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
output_type="string",
streaming_on=False,
)
# Create group chat
group_chat = GroupChat(
name="Financial Discussion",
description="A group chat for financial analysis and tax advice.",
agents=[agent1, agent2],
)
# Run the group chat
asyncio.run(
group_chat.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria? What do you guys think?"
)
)

@ -1,6 +1,5 @@
import os
import asyncio
import threading
from swarms import Agent
from swarm_models import OpenAIChat
import time
@ -40,18 +39,21 @@ agent = Agent(
streaming_on=False,
)
# Function to measure time and memory usage
def measure_time_and_memory(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
memory_usage = psutil.Process().memory_info().rss / 1024 ** 2
memory_usage = psutil.Process().memory_info().rss / 1024**2
print(f"Time taken: {end_time - start_time} seconds")
print(f"Memory used: {memory_usage} MB")
return result
return wrapper
# Function to run the agent asynchronously
@measure_time_and_memory
async def run_agent_async():
@ -61,11 +63,13 @@ async def run_agent_async():
)
)
# Function to run the agent on another thread
@measure_time_and_memory
def run_agent_thread():
asyncio.run(run_agent_async())
# Run the agent asynchronously and on another thread to test the speed
asyncio.run(run_agent_async())
run_agent_thread()

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "6.2.9"
version = "6.3.6"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]
@ -37,6 +37,14 @@ keywords = [
"Generative AI",
"Agent Marketplace",
"Agent Store",
"quant",
"finance",
"algorithmic trading",
"portfolio optimization",
"risk management",
"financial modeling",
"machine learning for finance",
"natural language processing for finance",
]
classifiers = [
"Development Status :: 4 - Beta",
@ -52,27 +60,18 @@ python = ">=3.10,<4.0"
torch = ">=2.1.1,<3.0"
transformers = ">= 4.39.0, <5.0.0"
asyncio = ">=3.4.3,<4.0"
langchain-community = "0.0.29"
langchain-experimental = "0.0.55"
backoff = "2.2.1"
toml = "*"
pypdf = "4.3.1"
loguru = "0.7.2"
loguru = "*"
pydantic = "2.8.2"
tenacity = "8.5.0"
Pillow = "10.4.0"
tenacity = "*"
psutil = "*"
sentry-sdk = {version = "*", extras = ["http"]} # Updated here
python-dotenv = "*"
PyYAML = "*"
docstring_parser = "0.16"
fastapi = "*"
openai = ">=1.30.1,<2.0"
termcolor = "*"
tiktoken = "*"
networkx = "*"
swarms-memory = "*"
black = "*"
aiofiles = "*"
swarm-models = "*"
clusterops = "*"
@ -96,9 +95,7 @@ mypy-protobuf = "^3.0.0"
[tool.poetry.group.test.dependencies]
pytest = "^8.1.1"
termcolor = "^2.4.0"
pandas = "^2.2.2"
fastapi = ">=0.110.1,<0.116.0"
[tool.ruff]
line-length = 70

@ -2,21 +2,16 @@
torch>=2.1.1,<3.0
transformers>=4.39.0,<5.0.0
asyncio>=3.4.3,<4.0
langchain-community==0.0.28
langchain-experimental==0.0.55
backoff==2.2.1
toml
pypdf==4.3.1
ratelimit==2.2.1
loguru==0.7.2
pydantic==2.8.2
tenacity==8.5.0
Pillow==10.4.0
tenacity
rich
psutil
sentry-sdk
python-dotenv
opencv-python-headless
PyYAML
docstring_parser==0.16
black>=23.1,<25.0
@ -26,12 +21,8 @@ types-pytz>=2023.3,<2025.0
types-chardet>=5.0.4.6
mypy-protobuf>=3.0.0
pytest>=8.1.1
termcolor>=2.4.0
pandas>=2.2.2
fastapi>=0.110.1
networkx
swarms-memory
pre-commit
aiofiles
swarm-models
clusterops

@ -24,8 +24,6 @@ import toml
import yaml
from pydantic import BaseModel
from swarm_models.tiktoken_wrapper import TikTokenizer
from termcolor import colored
from swarms.agents.ape_agent import auto_generate_prompt
from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3
from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
@ -671,11 +669,8 @@ class Agent:
return self.stopping_condition(response)
return False
except Exception as error:
print(
colored(
f"Error checking stopping condition: {error}",
"red",
)
logger.error(
f"Error checking stopping condition: {error}"
)
def dynamic_temperature(self):
@ -688,21 +683,20 @@ class Agent:
try:
if hasattr(self.llm, "temperature"):
# Randomly change the temperature attribute of self.llm object
logger.info("Enabling Random Dyamic Temperature")
self.llm.temperature = random.uniform(0.0, 1.0)
else:
# Use a default temperature
self.llm.temperature = 0.5
except Exception as error:
print(
colored(
f"Error dynamically changing temperature: {error}"
)
logger.error(
f"Error dynamically changing temperature: {error}"
)
def print_dashboard(self):
"""Print dashboard"""
print(colored("Initializing Agent Dashboard...", "yellow"))
formatter.print_panel(
f"Initializing Agent: {self.agent_name}"
)
data = self.to_dict()
@ -710,22 +704,19 @@ class Agent:
# data = json.dumps(data, indent=4)
# json_data = json.dumps(data, indent=4)
print(
colored(
f"""
Agent Dashboard
--------------------------------------------
formatter.print_panel(
f"""
Agent Dashboard
--------------------------------------------
Agent {self.agent_name} is initializing for {self.max_loops} with the following configuration:
----------------------------------------
Agent {self.agent_name} is initializing for {self.max_loops} with the following configuration:
----------------------------------------
Agent Configuration:
Configuration: {data}
Agent Configuration:
Configuration: {data}
----------------------------------------
""",
"green",
)
----------------------------------------
""",
)
def loop_count_print(
@ -737,7 +728,7 @@ class Agent:
loop_count (_type_): _description_
max_loops (_type_): _description_
"""
print(colored(f"\nLoop {loop_count} of {max_loops}", "cyan"))
logger.info(f"\nLoop {loop_count} of {max_loops}")
print("\n")
# Check parameters
@ -761,8 +752,8 @@ class Agent:
self,
task: Optional[str] = None,
img: Optional[str] = None,
is_last: bool = False,
print_task: bool = False,
is_last: Optional[bool] = False,
print_task: Optional[bool] = False,
*args,
**kwargs,
) -> Any:
@ -960,7 +951,7 @@ class Agent:
if self.interactive:
logger.info("Interactive mode enabled.")
user_input = colored(input("You: "), "red")
user_input = formatter.print_panel(input("You: "))
# User-defined exit command
if (
@ -1060,7 +1051,7 @@ class Agent:
except Exception as error:
logger.info(
f"Error running agent: {error} optimize your input parameters"
f"Error running agent: {error} optimize your input parameter"
)
raise error
@ -1261,7 +1252,7 @@ class Agent:
logger.info(f"Running bulk tasks: {inputs}")
return [self.run(**input_data) for input_data in inputs]
except Exception as error:
print(colored(f"Error running bulk run: {error}", "red"))
logger.info(f"Error running bulk run: {error}", "red")
def save(self) -> None:
"""Save the agent history to a file.
@ -1438,9 +1429,7 @@ class Agent:
with open(file_path, "w") as f:
yaml.dump(self.to_dict(), f)
except Exception as error:
logger.error(
colored(f"Error saving agent to YAML: {error}", "red")
)
logger.error(f"Error saving agent to YAML: {error}")
raise error
def get_llm_parameters(self):
@ -1505,7 +1494,7 @@ class Agent:
role=self.user_name, content=data
)
except Exception as error:
print(colored(f"Error ingesting docs: {error}", "red"))
logger.info(f"Error ingesting docs: {error}", "red")
def ingest_pdf(self, pdf: str):
"""Ingest the pdf into the memory
@ -1520,7 +1509,7 @@ class Agent:
role=self.user_name, content=text
)
except Exception as error:
print(colored(f"Error ingesting pdf: {error}", "red"))
logger.info(f"Error ingesting pdf: {error}", "red")
def receieve_message(self, name: str, message: str):
"""Receieve a message"""
@ -1604,12 +1593,10 @@ class Agent:
role=self.user_name, content=text
)
except Exception as error:
print(
colored(
f"Error getting docs from doc folders: {error}",
"red",
)
logger.error(
f"Error getting docs from doc folders: {error}"
)
raise error
def check_end_session_agentops(self):
if self.agent_ops_on is True:
@ -1629,7 +1616,8 @@ class Agent:
try:
# Query the long term memory
if self.long_term_memory is not None:
logger.info(f"Querying long term memory for: {task}")
formatter.print_panel(f"Querying RAG for: {task}")
memory_retrieval = self.long_term_memory.query(
task, *args, **kwargs
)
@ -1638,15 +1626,15 @@ class Agent:
f"Documents Available: {str(memory_retrieval)}"
)
# Count the tokens
memory_token_count = self.tokenizer.count_tokens(
memory_retrieval
)
if memory_token_count > self.memory_chunk_size:
# Truncate the memory by the memory chunk size
memory_retrieval = self.truncate_string_by_tokens(
memory_retrieval, self.memory_chunk_size
)
# # Count the tokens
# memory_token_count = self.tokenizer.count_tokens(
# memory_retrieval
# )
# if memory_token_count > self.memory_chunk_size:
# # Truncate the memory by the memory chunk size
# memory_retrieval = self.truncate_string_by_tokens(
# memory_retrieval, self.memory_chunk_size
# )
self.short_memory.add(
role="Database",

@ -1,8 +1,7 @@
import json
from typing import Any, Dict, List, Optional
from termcolor import colored
from swarms.utils.formatter import formatter
from swarms.structs.agent import Agent
from swarms.structs.base_structure import BaseStructure
from swarms.structs.task import Task
@ -132,9 +131,10 @@ class BaseWorkflow(BaseStructure):
for task in self.tasks:
task.result = None
except Exception as error:
print(
colored(f"Error resetting workflow: {error}", "red"),
formatter.print_panel(
f"Error resetting workflow: {error}"
)
raise error
def get_task_results(self) -> Dict[str, Any]:
"""
@ -148,10 +148,8 @@ class BaseWorkflow(BaseStructure):
task.description: task.result for task in self.tasks
}
except Exception as error:
print(
colored(
f"Error getting task results: {error}", "red"
),
formatter.print_panel(
f"Error getting task results: {error}"
)
def remove_task(self, task: str) -> None:
@ -163,12 +161,10 @@ class BaseWorkflow(BaseStructure):
if task.description != task
]
except Exception as error:
print(
colored(
f"Error removing task from workflow: {error}",
"red",
),
formatter.print_panel(
f"Error removing task from workflow: {error}",
)
raise error
def update_task(self, task: str, **updates) -> None:
"""
@ -203,11 +199,9 @@ class BaseWorkflow(BaseStructure):
f"Task {task} not found in workflow."
)
except Exception as error:
print(
colored(
f"Error updating task in workflow: {error}", "red"
),
)
formatter.print_panel(
f"Error updating task in workflow: {error}"
),
def delete_task(self, task: str) -> None:
"""
@ -240,12 +234,10 @@ class BaseWorkflow(BaseStructure):
f"Task {task} not found in workflow."
)
except Exception as error:
print(
colored(
f"Error deleting task from workflow: {error}",
"red",
),
formatter.print_panel(
f"Error deleting task from workflow: {error}",
)
raise error
def save_workflow_state(
self,
@ -287,23 +279,18 @@ class BaseWorkflow(BaseStructure):
}
json.dump(state, f, indent=4)
except Exception as error:
print(
colored(
f"Error saving workflow state: {error}",
"red",
)
formatter.print_panel(
f"Error saving workflow state: {error}",
)
raise error
def add_objective_to_workflow(self, task: str, **kwargs) -> None:
"""Adds an objective to the workflow."""
try:
print(
colored(
"""
Adding Objective to Workflow...""",
"green",
attrs=["bold", "underline"],
)
formatter.print_panel(
"""
Adding Objective to Workflow...""",
"green",
)
task = Task(
@ -314,12 +301,10 @@ class BaseWorkflow(BaseStructure):
)
self.tasks.append(task)
except Exception as error:
print(
colored(
f"Error adding objective to workflow: {error}",
"red",
)
formatter.print_panel(
f"Error adding objective to workflow: {error}",
)
raise error
def load_workflow_state(
self, filepath: str = None, **kwargs
@ -359,11 +344,8 @@ class BaseWorkflow(BaseStructure):
)
self.tasks.append(task)
except Exception as error:
print(
colored(
f"Error loading workflow state: {error}",
"red",
)
formatter.print_panel(
f"Error loading workflow state: {error}",
)
def workflow_dashboard(self, **kwargs) -> None:
@ -383,25 +365,21 @@ class BaseWorkflow(BaseStructure):
>>> workflow.workflow_dashboard()
"""
print(
colored(
f"""
Sequential Workflow Dashboard
--------------------------------
Name: {self.name}
Description: {self.description}
task_pool: {len(self.task_pool)}
Max Loops: {self.max_loops}
Autosave: {self.autosave}
Autosave Filepath: {self.saved_state_filepath}
Restore Filepath: {self.restore_state_filepath}
--------------------------------
Metadata:
kwargs: {kwargs}
""",
"cyan",
attrs=["bold", "underline"],
)
formatter.print_panel(
f"""
Sequential Workflow Dashboard
--------------------------------
Name: {self.name}
Description: {self.description}
task_pool: {len(self.task_pool)}
Max Loops: {self.max_loops}
Autosave: {self.autosave}
Autosave Filepath: {self.saved_state_filepath}
Restore Filepath: {self.restore_state_filepath}
--------------------------------
Metadata:
kwargs: {kwargs}
"""
)
def workflow_bootup(self, **kwargs) -> None:
@ -409,11 +387,6 @@ class BaseWorkflow(BaseStructure):
Workflow bootup.
"""
print(
colored(
"""
Sequential Workflow Initializing...""",
"green",
attrs=["bold", "underline"],
)
formatter.print_panel(
"""Sequential Workflow Initializing...""",
)

@ -3,10 +3,9 @@ import json
from typing import Any, Optional
import yaml
from termcolor import colored
from swarms.structs.base_structure import BaseStructure
from typing import TYPE_CHECKING
from swarms.utils.formatter import formatter
if TYPE_CHECKING:
from swarms.structs.agent import (
@ -191,18 +190,9 @@ class Conversation(BaseStructure):
Args:
detailed (bool, optional): detailed. Defaults to False.
"""
role_to_color = {
"system": "red",
"user": "green",
"assistant": "blue",
"function": "magenta",
}
for message in self.conversation_history:
print(
colored(
f"{message['role']}: {message['content']}\n\n",
role_to_color[message["role"]],
)
formatter.print_panel(
f"{message['role']}: {message['content']}\n\n"
)
def export_conversation(self, filename: str, *args, **kwargs):
@ -307,46 +297,36 @@ class Conversation(BaseStructure):
for message in messages:
if message["role"] == "system":
print(
colored(
f"system: {message['content']}\n",
role_to_color[message["role"]],
)
formatter.print_panel(
f"system: {message['content']}\n",
role_to_color[message["role"]],
)
elif message["role"] == "user":
print(
colored(
f"user: {message['content']}\n",
role_to_color[message["role"]],
)
formatter.print_panel(
f"user: {message['content']}\n",
role_to_color[message["role"]],
)
elif message["role"] == "assistant" and message.get(
"function_call"
):
print(
colored(
f"assistant: {message['function_call']}\n",
role_to_color[message["role"]],
)
formatter.print_panel(
f"assistant: {message['function_call']}\n",
role_to_color[message["role"]],
)
elif message["role"] == "assistant" and not message.get(
"function_call"
):
print(
colored(
f"assistant: {message['content']}\n",
role_to_color[message["role"]],
)
formatter.print_panel(
f"assistant: {message['content']}\n",
role_to_color[message["role"]],
)
elif message["role"] == "tool":
print(
colored(
(
f"function ({message['name']}):"
f" {message['content']}\n"
),
role_to_color[message["role"]],
)
formatter.print_panel(
(
f"function ({message['name']}):"
f" {message['content']}\n"
),
role_to_color[message["role"]],
)
def truncate_memory_with_tokenizer(self):

@ -86,9 +86,7 @@ class MixtureOfAgents:
self.input_schema = MixtureOfAgentsInput(
name=name,
description=description,
agents=[
agent.to_dict() for agent in self.agents
],
agents=[agent.to_dict() for agent in self.agents],
aggregator_agent=aggregator_agent.to_dict(),
aggregator_system_prompt=self.aggregator_system_prompt,
layers=self.layers,

@ -414,7 +414,7 @@ def run_agents_with_tasks_concurrently(
List[Any]: A list of outputs from each agent execution.
"""
# Make the first agent not use the ifrs
if no_clusterops:
return _run_agents_with_tasks_concurrently(
agents, tasks, batch_size, max_workers

@ -121,16 +121,14 @@ class AgentRearrange(BaseSwarm):
output_type: OutputType = "final",
docs: List[str] = None,
doc_folder: str = None,
device: str = "cpu",
device_id: int = 0,
all_cores: bool = False,
all_gpus: bool = True,
no_use_clusterops: bool = True,
*args,
**kwargs,
):
# reliability_check(
# agents=agents,
# name=name,
# description=description,
# flow=flow,
# max_loops=max_loops,
# )
super(AgentRearrange, self).__init__(
name=name,
description=description,
@ -150,6 +148,11 @@ class AgentRearrange(BaseSwarm):
self.output_type = output_type
self.docs = docs
self.doc_folder = doc_folder
self.device = device
self.device_id = device_id
self.all_cores = all_cores
self.all_gpus = all_gpus
self.no_use_clusterops = no_use_clusterops
self.swarm_history = {
agent.agent_name: [] for agent in agents
}
@ -506,7 +509,11 @@ class AgentRearrange(BaseSwarm):
Returns:
The result from executing the task through the cluster operations wrapper.
"""
if no_use_clusterops:
no_use_clusterops = (
no_use_clusterops or self.no_use_clusterops
)
if no_use_clusterops is True:
return self._run(
task=task,
img=img,

@ -107,7 +107,7 @@ class SequentialWorkflow:
all_cores: bool = False,
all_gpus: bool = False,
device_id: int = 0,
no_use_clusterops: bool = False,
no_use_clusterops: bool = True,
*args,
**kwargs,
) -> str:

@ -1,7 +1,6 @@
import json
from typing import Any, Dict, List, Union
from termcolor import cprint
from transformers import PreTrainedModel, PreTrainedTokenizer
from pydantic import BaseModel
from swarms.tools.logits_processor import (
@ -68,15 +67,6 @@ class Jsonformer:
self.temperature = temperature
self.max_string_token_length = max_string_token_length
def debug(self, caller: str, value: str, is_prompt: bool = False):
if self.debug_on:
if is_prompt:
cprint(caller, "green", end=" ")
cprint(value, "yellow")
else:
cprint(caller, "green", end=" ")
cprint(value, "blue")
def generate_number(
self, temperature: Union[float, None] = None, iterations=0
):

@ -3,8 +3,7 @@ from typing import Any, List
import inspect
from typing import Callable
from termcolor import colored
from swarms.utils.formatter import formatter
def scrape_tool_func_docs(fn: Callable) -> str:
@ -37,17 +36,16 @@ def scrape_tool_func_docs(fn: Callable) -> str:
f" {inspect.getdoc(fn)}\nParameters:\n{parameters_str}"
)
except Exception as error:
print(
colored(
(
f"Error scraping tool function docs {error} try"
" optimizing your inputs with different"
" variables and attempt once more."
),
"red",
)
error_msg = (
formatter.print_panel(
f"Error scraping tool function docs {error} try"
" optimizing your inputs with different"
" variables and attempt once more."
),
)
raise error
def tool_find_by_name(tool_name: str, tools: List[Any]):
"""Find the tool by name"""

@ -0,0 +1,203 @@
from typing import Any
import inspect
from functools import partial
import logging
class NameResolver:
"""Utility class for resolving names of various objects"""
@staticmethod
def get_name(obj: Any, default: str = "unnamed_callable") -> str:
"""
Get the name of any object with multiple fallback strategies.
Args:
obj: The object to get the name from
default: Default name if all strategies fail
Returns:
str: The resolved name
"""
strategies = [
# Try getting __name__ attribute
lambda x: getattr(x, "__name__", None),
# Try getting class name
lambda x: (
x.__class__.__name__
if hasattr(x, "__class__")
else None
),
# Try getting function name if it's a partial
lambda x: (
x.func.__name__ if isinstance(x, partial) else None
),
# Try getting the name from the class's type
lambda x: type(x).__name__,
# Try getting qualname
lambda x: getattr(x, "__qualname__", None),
# Try getting the module and class name
lambda x: (
f"{x.__module__}.{x.__class__.__name__}"
if hasattr(x, "__module__")
else None
),
# For async functions
lambda x: (
x.__name__ if inspect.iscoroutinefunction(x) else None
),
# For classes with custom __str__
lambda x: (
str(x)
if hasattr(x, "__str__")
and x.__str__ != object.__str__
else None
),
# For wrapped functions
lambda x: (
getattr(x, "__wrapped__", None).__name__
if hasattr(x, "__wrapped__")
else None
),
]
# Try each strategy
for strategy in strategies:
try:
name = strategy(obj)
if name and isinstance(name, str):
return name.replace(" ", "_").replace("-", "_")
except Exception:
continue
# Return default if all strategies fail
return default
@staticmethod
def get_callable_details(obj: Any) -> dict:
"""
Get detailed information about a callable object.
Returns:
dict: Dictionary containing:
- name: The resolved name
- type: The type of callable
- signature: The signature if available
- module: The module name if available
- doc: The docstring if available
"""
details = {
"name": NameResolver.get_name(obj),
"type": "unknown",
"signature": None,
"module": getattr(obj, "__module__", "unknown"),
"doc": inspect.getdoc(obj)
or "No documentation available",
}
# Determine the type
if inspect.isclass(obj):
details["type"] = "class"
elif inspect.iscoroutinefunction(obj):
details["type"] = "async_function"
elif inspect.isfunction(obj):
details["type"] = "function"
elif isinstance(obj, partial):
details["type"] = "partial"
elif callable(obj):
details["type"] = "callable"
# Try to get signature
try:
details["signature"] = str(inspect.signature(obj))
except (ValueError, TypeError):
details["signature"] = "Unknown signature"
return details
@classmethod
def get_safe_name(cls, obj: Any, max_retries: int = 3) -> str:
"""
Safely get a name with retries and validation.
Args:
obj: Object to get name from
max_retries: Maximum number of retry attempts
Returns:
str: A valid name string
"""
retries = 0
last_error = None
while retries < max_retries:
try:
name = cls.get_name(obj)
# Validate and clean the name
if name:
# Remove invalid characters
clean_name = "".join(
c
for c in name
if c.isalnum() or c in ["_", "."]
)
# Ensure it starts with a letter or underscore
if (
not clean_name[0].isalpha()
and clean_name[0] != "_"
):
clean_name = f"_{clean_name}"
return clean_name
except Exception as e:
last_error = e
retries += 1
# If all retries failed, generate a unique fallback name
import uuid
fallback = f"callable_{uuid.uuid4().hex[:8]}"
logging.warning(
f"Failed to get name after {max_retries} retries. Using fallback: {fallback}. "
f"Last error: {str(last_error)}"
)
return fallback
# # Example usage
# if __name__ == "__main__":
# def test_resolver():
# # Test cases
# class TestClass:
# def method(self):
# pass
# async def async_func():
# pass
# test_cases = [
# TestClass, # Class
# TestClass(), # Instance
# async_func, # Async function
# lambda x: x, # Lambda
# partial(print, end=""), # Partial
# TestClass.method, # Method
# print, # Built-in function
# str, # Built-in class
# ]
# resolver = NameResolver()
# print("\nName Resolution Results:")
# print("-" * 50)
# for obj in test_cases:
# details = resolver.get_callable_details(obj)
# safe_name = resolver.get_safe_name(obj)
# print(f"\nObject: {obj}")
# print(f"Safe Name: {safe_name}")
# print(f"Details: {details}")
# test_resolver()

@ -1,4 +1,4 @@
from termcolor import colored
from swarms.utils.formatter import formatter
def display_markdown_message(message: str, color: str = "cyan"):
@ -12,13 +12,10 @@ def display_markdown_message(message: str, color: str = "cyan"):
if line == "":
print()
elif line == "---":
print(colored("-" * 50, color))
formatter.print_panel("-" * 50)
else:
print(colored(line, color))
formatter.print_panel(line)
if "\n" not in message and message.startswith(">"):
# Aesthetic choice. For these tags, they need a space below them
print()
# display_markdown_message("I love you and you are beautiful.", "cyan")

Loading…
Cancel
Save