[7.1.2] - many files deleted

pull/762/head
Kye Gomez 2 months ago
parent 2e6b2df013
commit 0a39f2d48a

@ -0,0 +1,34 @@
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
from swarms.structs.majority_voting import MajorityVoting
from dotenv import load_dotenv
load_dotenv()
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
max_loops=1,
model_name="gpt-4o",
dynamic_temperature_enabled=True,
user_name="swarms_corp",
retry_attempts=3,
context_length=8192,
return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
max_tokens=4000, # max output tokens
saved_state_path="agent_00.json",
interactive=False,
)
swarm = MajorityVoting(agents=[agent, agent, agent])
swarm.run(
"Create a table of super high growth opportunities for AI. I have $40k to invest in ETFs, index funds, and more. Please create a table in markdown.",
)

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "7.1.1"
version = "7.1.2"
description = "Swarms - TGSC"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -1,3 +1,4 @@
from concurrent.futures import ThreadPoolExecutor
import json
import os
import subprocess
@ -316,3 +317,20 @@ class OpenAIAssistant(Agent):
def call(self, task: str, *args, **kwargs) -> str:
"""Alias for run() to maintain compatibility with different agent interfaces."""
return self.run(task, *args, **kwargs)
def batch_run(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:
"""Run a batch of tasks using the OpenAI Assistant."""
return [self.run(task, *args, **kwargs) for task in tasks]
def run_concurrently(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:
"""Run a batch of tasks concurrently using the OpenAI Assistant."""
with ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
return list(
executor.map(self.run, tasks, *args, **kwargs)
)

@ -6,7 +6,7 @@ from typing import Dict
from swarms.utils.loguru_logger import initialize_logger
from swarms.telemetry.capture_sys_data import (
from swarms.telemetry.main import (
capture_system_data,
log_agent_data,
)

@ -11,7 +11,7 @@ from pydantic import (
)
from pydantic.v1 import validator
from swarms.telemetry.capture_sys_data import (
from swarms.telemetry.main import (
capture_system_data,
log_agent_data,
)

@ -76,15 +76,7 @@ from swarms.structs.swarming_architectures import (
star_swarm,
)
from swarms.structs.task import Task
from swarms.structs.utils import (
detect_markdown,
distribute_tasks,
extract_key_from_json,
extract_tokens_from_text,
find_agent_by_id,
find_token_in_text,
parse_tasks,
)
__all__ = [
"Agent",
@ -107,13 +99,6 @@ __all__ = [
"RoundRobinSwarm",
"SequentialWorkflow",
"Task",
"detect_markdown",
"distribute_tasks",
"extract_key_from_json",
"extract_tokens_from_text",
"find_agent_by_id",
"find_token_in_text",
"parse_tasks",
"MixtureOfAgents",
"GraphWorkflow",
"Node",

@ -52,7 +52,7 @@ from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
from swarms.telemetry.capture_sys_data import log_agent_data
from swarms.telemetry.main import log_agent_data
from swarms.agents.agent_print import agent_print
from swarms.utils.litellm_tokenizer import count_tokens

@ -1,417 +0,0 @@
import json
import logging
import time
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
import yaml
from pydantic import BaseModel
from swarms.utils.litellm_tokenizer import count_tokens
logger = logging.getLogger(__name__)
class MemoryMetadata(BaseModel):
"""Metadata for memory entries"""
timestamp: Optional[float] = time.time()
role: Optional[str] = None
agent_name: Optional[str] = None
session_id: Optional[str] = None
memory_type: Optional[str] = None # 'short_term' or 'long_term'
token_count: Optional[int] = None
message_id: Optional[str] = str(uuid.uuid4())
class MemoryEntry(BaseModel):
"""Single memory entry with content and metadata"""
content: Optional[str] = None
metadata: Optional[MemoryMetadata] = None
class MemoryConfig(BaseModel):
"""Configuration for memory manager"""
max_short_term_tokens: Optional[int] = 4096
max_entries: Optional[int] = None
system_messages_token_buffer: Optional[int] = 1000
enable_long_term_memory: Optional[bool] = False
auto_archive: Optional[bool] = True
archive_threshold: Optional[float] = 0.8 # Archive when 80% full
class MemoryManager:
"""
Manages both short-term and long-term memory for an agent, handling token limits,
archival, and context retrieval.
Args:
config (MemoryConfig): Configuration for memory management
tokenizer (Optional[Any]): Tokenizer to use for token counting
long_term_memory (Optional[Any]): Vector store or database for long-term storage
"""
def __init__(
self,
config: MemoryConfig,
tokenizer: Optional[Any] = None,
long_term_memory: Optional[Any] = None,
):
self.config = config
self.tokenizer = tokenizer
self.long_term_memory = long_term_memory
# Initialize memories
self.short_term_memory: List[MemoryEntry] = []
self.system_messages: List[MemoryEntry] = []
# Memory statistics
self.total_tokens_processed: int = 0
self.archived_entries_count: int = 0
def create_memory_entry(
self,
content: str,
role: str,
agent_name: str,
session_id: str,
memory_type: str = "short_term",
) -> MemoryEntry:
"""Create a new memory entry with metadata"""
metadata = MemoryMetadata(
timestamp=time.time(),
role=role,
agent_name=agent_name,
session_id=session_id,
memory_type=memory_type,
token_count=count_tokens(content),
)
return MemoryEntry(content=content, metadata=metadata)
def add_memory(
self,
content: str,
role: str,
agent_name: str,
session_id: str,
is_system: bool = False,
) -> None:
"""Add a new memory entry to appropriate storage"""
entry = self.create_memory_entry(
content=content,
role=role,
agent_name=agent_name,
session_id=session_id,
memory_type="system" if is_system else "short_term",
)
if is_system:
self.system_messages.append(entry)
else:
self.short_term_memory.append(entry)
# Check if archiving is needed
if self.should_archive():
self.archive_old_memories()
self.total_tokens_processed += entry.metadata.token_count
def get_current_token_count(self) -> int:
"""Get total tokens in short-term memory"""
return sum(
entry.metadata.token_count
for entry in self.short_term_memory
)
def get_system_messages_token_count(self) -> int:
"""Get total tokens in system messages"""
return sum(
entry.metadata.token_count
for entry in self.system_messages
)
def should_archive(self) -> bool:
"""Check if archiving is needed based on configuration"""
if not self.config.auto_archive:
return False
current_usage = (
self.get_current_token_count()
/ self.config.max_short_term_tokens
)
return current_usage >= self.config.archive_threshold
def archive_old_memories(self) -> None:
"""Move older memories to long-term storage"""
if not self.long_term_memory:
logger.warning(
"No long-term memory storage configured for archiving"
)
return
while self.should_archive():
# Get oldest non-system message
if not self.short_term_memory:
break
oldest_entry = self.short_term_memory.pop(0)
# Store in long-term memory
self.store_in_long_term_memory(oldest_entry)
self.archived_entries_count += 1
def store_in_long_term_memory(self, entry: MemoryEntry) -> None:
"""Store a memory entry in long-term memory"""
if self.long_term_memory is None:
logger.warning(
"Attempted to store in non-existent long-term memory"
)
return
try:
self.long_term_memory.add(str(entry.model_dump()))
except Exception as e:
logger.error(f"Error storing in long-term memory: {e}")
# Re-add to short-term if storage fails
self.short_term_memory.insert(0, entry)
def get_relevant_context(
self, query: str, max_tokens: Optional[int] = None
) -> str:
"""
Get relevant context from both memory types
Args:
query (str): Query to match against memories
max_tokens (Optional[int]): Maximum tokens to return
Returns:
str: Combined relevant context
"""
contexts = []
# Add system messages first
for entry in self.system_messages:
contexts.append(entry.content)
# Add short-term memory
for entry in reversed(self.short_term_memory):
contexts.append(entry.content)
# Query long-term memory if available
if self.long_term_memory is not None:
long_term_context = self.long_term_memory.query(query)
if long_term_context:
contexts.append(str(long_term_context))
# Combine and truncate if needed
combined = "\n".join(contexts)
if max_tokens:
combined = self.truncate_to_token_limit(
combined, max_tokens
)
return combined
def truncate_to_token_limit(
self, text: str, max_tokens: int
) -> str:
"""Truncate text to fit within token limit"""
current_tokens = count_tokens(text)
if current_tokens <= max_tokens:
return text
# Truncate by splitting into sentences and rebuilding
sentences = text.split(". ")
result = []
current_count = 0
for sentence in sentences:
sentence_tokens = count_tokens(sentence)
if current_count + sentence_tokens <= max_tokens:
result.append(sentence)
current_count += sentence_tokens
else:
break
return ". ".join(result)
def clear_short_term_memory(
self, preserve_system: bool = True
) -> None:
"""Clear short-term memory with option to preserve system messages"""
if not preserve_system:
self.system_messages.clear()
self.short_term_memory.clear()
logger.info(
"Cleared short-term memory"
+ " (preserved system messages)"
if preserve_system
else ""
)
def get_memory_stats(self) -> Dict[str, Any]:
"""Get detailed memory statistics"""
return {
"short_term_messages": len(self.short_term_memory),
"system_messages": len(self.system_messages),
"current_tokens": self.get_current_token_count(),
"system_tokens": self.get_system_messages_token_count(),
"max_tokens": self.config.max_short_term_tokens,
"token_usage_percent": round(
(
self.get_current_token_count()
/ self.config.max_short_term_tokens
)
* 100,
2,
),
"has_long_term_memory": self.long_term_memory is not None,
"archived_entries": self.archived_entries_count,
"total_tokens_processed": self.total_tokens_processed,
}
def save_memory_snapshot(self, file_path: str) -> None:
"""Save current memory state to file"""
try:
data = {
"timestamp": datetime.now().isoformat(),
"config": self.config.model_dump(),
"system_messages": [
entry.model_dump()
for entry in self.system_messages
],
"short_term_memory": [
entry.model_dump()
for entry in self.short_term_memory
],
"stats": self.get_memory_stats(),
}
with open(file_path, "w") as f:
if file_path.endswith(".yaml"):
yaml.dump(data, f)
else:
json.dump(data, f, indent=2)
logger.info(f"Saved memory snapshot to {file_path}")
except Exception as e:
logger.error(f"Error saving memory snapshot: {e}")
raise
def load_memory_snapshot(self, file_path: str) -> None:
"""Load memory state from file"""
try:
with open(file_path, "r") as f:
if file_path.endswith(".yaml"):
data = yaml.safe_load(f)
else:
data = json.load(f)
self.config = MemoryConfig(**data["config"])
self.system_messages = [
MemoryEntry(**entry)
for entry in data["system_messages"]
]
self.short_term_memory = [
MemoryEntry(**entry)
for entry in data["short_term_memory"]
]
logger.info(f"Loaded memory snapshot from {file_path}")
except Exception as e:
logger.error(f"Error loading memory snapshot: {e}")
raise
def search_memories(
self, query: str, memory_type: str = "all"
) -> List[MemoryEntry]:
"""
Search through memories of specified type
Args:
query (str): Search query
memory_type (str): Type of memories to search ("short_term", "system", "long_term", or "all")
Returns:
List[MemoryEntry]: Matching memory entries
"""
results = []
if memory_type in ["short_term", "all"]:
results.extend(
[
entry
for entry in self.short_term_memory
if query.lower() in entry.content.lower()
]
)
if memory_type in ["system", "all"]:
results.extend(
[
entry
for entry in self.system_messages
if query.lower() in entry.content.lower()
]
)
if (
memory_type in ["long_term", "all"]
and self.long_term_memory is not None
):
long_term_results = self.long_term_memory.query(query)
if long_term_results:
# Convert long-term results to MemoryEntry format
for result in long_term_results:
content = str(result)
metadata = MemoryMetadata(
timestamp=time.time(),
role="long_term",
agent_name="system",
session_id="long_term",
memory_type="long_term",
token_count=count_tokens(content),
)
results.append(
MemoryEntry(
content=content, metadata=metadata
)
)
return results
def get_memory_by_timeframe(
self, start_time: float, end_time: float
) -> List[MemoryEntry]:
"""Get memories within a specific timeframe"""
return [
entry
for entry in self.short_term_memory
if start_time <= entry.metadata.timestamp <= end_time
]
def export_memories(
self, file_path: str, format: str = "json"
) -> None:
"""Export memories to file in specified format"""
data = {
"system_messages": [
entry.model_dump() for entry in self.system_messages
],
"short_term_memory": [
entry.model_dump() for entry in self.short_term_memory
],
"stats": self.get_memory_stats(),
}
with open(file_path, "w") as f:
if format == "yaml":
yaml.dump(data, f)
else:
json.dump(data, f, indent=2)

@ -1,177 +0,0 @@
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Union
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger("company-swarm")
@dataclass
class Company(BaseSwarm):
"""
Represents a company with a hierarchical organizational structure.
"""
org_chart: List[List[Agent]]
shared_instructions: str = None
ceo: Optional[Agent] = None
agents: List[Agent] = field(default_factory=list)
agent_interactions: Dict[str, List[str]] = field(
default_factory=dict
)
def __post_init__(self):
self._parse_org_chart(self.org_chart)
def add(self, agent: Agent) -> None:
"""
Adds an agent to the company.
Args:
agent (Agent): The agent to be added.
Raises:
ValueError: If an agent with the same ID already exists in the company.
"""
try:
if any(
existing_agent.id == agent.id
for existing_agent in self.agents
):
raise ValueError(
f"Agent with id {agent.id} already exists in the"
" company."
)
self.agents.append(agent)
except Exception as error:
logger.error(
f"[ERROR][CLASS: Company][METHOD: add] {error}"
)
raise error
def get(self, agent_name: str) -> Agent:
"""
Retrieves an agent from the company by name.
Args:
agent_name (str): The name of the agent to retrieve.
Returns:
Agent: The retrieved agent.
Raises:
ValueError: If an agent with the specified name does not exist in the company.
"""
try:
for agent in self.agents:
if agent.name == agent_name:
return agent
raise ValueError(
f"Agent with name {agent_name} does not exist in the"
" company."
)
except Exception as error:
logger.error(
f"[ERROR][CLASS: Company][METHOD: get] {error}"
)
raise error
def remove(self, agent: Agent) -> None:
"""
Removes an agent from the company.
Args:
agent (Agent): The agent to be removed.
"""
try:
self.agents.remove(agent)
except Exception as error:
logger.error(
f"[ERROR][CLASS: Company][METHOD: remove] {error}"
)
raise error
def _parse_org_chart(
self, org_chart: Union[List[Agent], List[List[Agent]]]
) -> None:
"""
Parses the organization chart and adds agents to the company.
Args:
org_chart (Union[List[Agent], List[List[Agent]]]): The organization chart
representing the hierarchy of agents.
Raises:
ValueError: If more than one CEO is found in the org chart or if an invalid
agent is encountered.
"""
try:
for node in org_chart:
if isinstance(node, Agent):
if self.ceo:
raise ValueError("1 CEO is only allowed")
self.ceo = node
self.add(node)
elif isinstance(node, list):
for agent in node:
if not isinstance(agent, Agent):
raise ValueError(
"Invalid agent in org chart"
)
self.add(agent)
for i, agent in enumerate(node):
if i == len(node) - 1:
continue
for other_agent in node[i + 1]:
self.__init_task(agent, other_agent)
except Exception as error:
logger.error(
"[ERROR][CLASS: Company][METHOD: _parse_org_chart]"
f" {error}"
)
raise error
def _init_interaction(
self,
agent1: Agent,
agent2: Agent,
) -> None:
"""
Initializes the interaction between two agents.
Args:
agent1 (Agent): The first agent involved in the interaction.
agent2 (Agent): The second agent involved in the interaction.
Returns:
None
"""
if agent1.ai_name not in self.agents_interactions:
self.agents_interactions[agent1.ai_name] = []
self.agents_interactions[agent1.ai_name].append(
agent2.ai_name
)
def run(self):
"""
Run the company
"""
for (
agent_name,
interaction_agents,
) in self.agents_interactions.items():
agent = self.get(agent_name)
for interaction_agent in interaction_agents:
task_description = (
f"Task for {agent_name} to interact with"
f" {interaction_agent}"
)
print(f"{task_description} is being executed")
agent.run(task_description)

@ -13,7 +13,6 @@ from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="hierarchical_swarm")
class HierarchicalOrder(BaseModel):
agent_name: str = Field(
...,

@ -1,11 +1,16 @@
import asyncio
import concurrent.futures
import multiprocessing
import os
import re
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, List, Optional
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.file_processing import create_file
from swarms.structs.multi_agent_exec import run_agents_concurrently
from swarms.utils.formatter import formatter
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="majority_voting")
@ -139,13 +144,17 @@ class MajorityVoting:
description: str = "A majority voting system for agents",
agents: List[Agent] = [],
output_parser: Optional[Callable] = majority_voting,
consensus_agent: Optional[Agent] = None,
autosave: bool = False,
verbose: bool = False,
*args,
**kwargs,
):
self.name = name
self.description = description
self.agents = agents
self.output_parser = output_parser
self.consensus_agent = consensus_agent
self.autosave = autosave
self.verbose = verbose
@ -153,19 +162,17 @@ class MajorityVoting:
time_enabled=True, *args, **kwargs
)
# If autosave is enabled, save the conversation to a file
if self.autosave:
create_file(
str(self.conversation), "majority_voting.json"
)
self.initialize_majority_voting()
def initialize_majority_voting(self):
if self.agents is None:
raise ValueError("Agents list is empty")
# Log the agents
logger.info("Initializing majority voting system")
# Length of agents
logger.info(f"Number of agents: {len(self.agents)}")
logger.info(
"Agents:"
f" {', '.join(agent.agent_name for agent in self.agents)}"
formatter.print_panel(
f"Initializing majority voting system\nNumber of agents: {len(self.agents)}\nAgents: {', '.join(agent.agent_name for agent in self.agents)}",
title="Majority Voting",
)
def run(self, task: str, *args, **kwargs) -> List[Any]:
@ -181,29 +188,17 @@ class MajorityVoting:
List[Any]: The majority vote.
"""
# Route to each agent
with concurrent.futures.ThreadPoolExecutor() as executor:
logger.info("Running agents concurrently")
futures = [
executor.submit(agent.run, task, *args)
for agent in self.agents
]
results = [
future.result()
for future in concurrent.futures.as_completed(futures)
]
results = run_agents_concurrently(
self.agents, task, max_workers=os.cpu_count()
)
# Add responses to conversation and log them
for agent, response in zip(self.agents, results):
response = (
response if isinstance(response, list) else [response]
)
self.conversation.add(agent.agent_name, response)
logger.info(
f"[Agent][Name: {agent.agent_name}][Response:"
f" {response}]"
)
# Perform majority voting on the conversation
responses = [
@ -217,8 +212,87 @@ class MajorityVoting:
majority_vote = self.output_parser(
responses, *args, **kwargs
)
elif self.consensus_agent is not None:
majority_vote = self.consensus_agent.run(responses)
else:
majority_vote = majority_voting(responses)
# Return the majority vote
return majority_vote
def batch_run(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:
"""
Runs the majority voting system in batch mode.
Args:
tasks (List[str]): List of tasks to be performed by the agents.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
List[Any]: List of majority votes for each task.
"""
return [self.run(task, *args, **kwargs) for task in tasks]
def run_concurrently(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:
"""
Runs the majority voting system concurrently.
Args:
tasks (List[str]): List of tasks to be performed by the agents.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
List[Any]: List of majority votes for each task.
"""
with ThreadPoolExecutor(
max_workers=os.cpu_count()
) as executor:
futures = [
executor.submit(self.run, task, *args, **kwargs)
for task in tasks
]
return [
future.result()
for future in concurrent.futures.as_completed(futures)
]
def run_concurrently_multiprocess(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:
"""
Runs the majority voting system concurrently using multiprocessing.
Args:
tasks (List[str]): List of tasks to be performed by the agents.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
List[Any]: List of majority votes for each task.
"""
with multiprocessing.Pool(processes=os.cpu_count()) as pool:
return pool.map(self.run, tasks)
async def run_async(
self, tasks: List[str], *args, **kwargs
) -> List[Any]:
"""
Runs the majority voting system concurrently using asyncio.
Args:
tasks (List[str]): List of tasks to be performed by the agents.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
List[Any]: List of majority votes for each task.
"""
return await asyncio.gather(
*[self.run(task, *args, **kwargs) for task in tasks]
)

@ -5,7 +5,7 @@ from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.telemetry.capture_sys_data import log_agent_data
from swarms.telemetry.main import log_agent_data
from swarms.schemas.agent_step_schemas import ManySteps
from swarms.prompts.ag_prompt import aggregator_system_prompt
from swarms.utils.loguru_logger import initialize_logger

@ -15,6 +15,13 @@ from swarms.utils.wrapper_clusterop import (
)
@dataclass
class ResourceMetrics:
cpu_percent: float
memory_percent: float
active_threads: int
def run_single_agent(agent: AgentType, task: str) -> Any:
"""Run a single agent synchronously"""
return agent.run(task)
@ -79,7 +86,7 @@ def run_agents_concurrently(
List of outputs from each agent
"""
# Optimize defaults based on system resources
cpu_cores = cpu_count()
cpu_cores = os.cpu_count()
batch_size = batch_size or cpu_cores
max_workers = max_workers or cpu_cores * 2
@ -275,13 +282,6 @@ def run_agents_with_timeout(
return results
@dataclass
class ResourceMetrics:
cpu_percent: float
memory_percent: float
active_threads: int
def get_system_metrics() -> ResourceMetrics:
"""Get current system resource usage"""
return ResourceMetrics(

@ -16,7 +16,7 @@ from swarms.utils.loguru_logger import initialize_logger
from swarms.utils.wrapper_clusterop import (
exec_callable_with_clusterops,
)
from swarms.telemetry.capture_sys_data import log_agent_data
from swarms.telemetry.main import log_agent_data
logger = initialize_logger(log_folder="rearrange")

@ -10,7 +10,7 @@ from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.telemetry.capture_sys_data import log_agent_data
from swarms.telemetry.main import log_agent_data
from swarms.utils.file_processing import create_file_in_folder
from swarms.utils.loguru_logger import initialize_logger

@ -4,7 +4,7 @@ import uuid
from pydantic import BaseModel, Field
class AgentResponde(BaseModel):
class AgentRespond(BaseModel):
id: str = Field(default=uuid.uuid4().hex)
timestamp: str = Field(default=time.time())
agent_position: int = Field(description="Agent in swarm position")
@ -18,6 +18,6 @@ class SwarmOutput(BaseModel):
name: str = Field(description="Swarm name")
description: str = Field(description="Swarm description")
swarm_type: str = Field(description="Swarm type")
agent_outputs: List[AgentResponde] = Field(
agent_outputs: List[AgentRespond] = Field(
description="List of agent responses"
)

@ -1,32 +1,7 @@
import json
import re
from typing import Any, Dict, List, Optional
from typing import List
from swarms.structs.agent import Agent
# Helper functions for manager/corporate agents
def parse_tasks(
task: str = None,
) -> Dict[str, Any]:
"""Parse tasks
Args:
task (str, optional): _description_. Defaults to None.
Returns:
Dict[str, Any]: _description_
"""
tasks = {}
for line in task.split("\n"):
if line.startswith("<agent_id>") and line.endwith(
"</agent_id>"
):
agent_id, task = line[10:-11].split("><")
tasks[agent_id] = task
return tasks
def find_agent_by_id(
agent_id: str = None,
agents: List[Agent] = None,
@ -43,106 +18,51 @@ def find_agent_by_id(
Returns:
Agent: _description_
"""
for agent in agents:
if agent.id == agent_id:
if task:
return agent.run(task, *args, **kwargs)
else:
return agent
return None
def distribute_tasks(
task: str = None, agents: List[Agent] = None, *args, **kwargs
):
"""Distribute tasks to agents
Args:
task (str, optional): _description_. Defaults to None.
agents (List[Agent], optional): _description_. Defaults to None.
"""
# Parse the task to extract tasks and agent id
tasks = parse_tasks(task)
# Distribute tasks to agents
for agent_id, task in tasks.item():
assigned_agent = find_agent_by_id(agent_id, agents)
if assigned_agent:
print(f"Assigning task {task} to agent {agent_id}")
output = assigned_agent.run(task, *args, **kwargs)
print(f"Output from agent {agent_id}: {output}")
else:
print(
f"No agent found with ID {agent_id}. Task '{task}' is"
" not assigned."
)
def find_token_in_text(text: str, token: str = "<DONE>") -> bool:
"""
Parse a block of text for a specific token.
Args:
text (str): The text to parse.
token (str): The token to find.
Returns:
bool: True if the token is found in the text, False otherwise.
"""
# Check if the token is in the text
if token in text:
return True
else:
return False
def extract_key_from_json(
json_response: str, key: str
) -> Optional[str]:
"""
Extract a specific key from a JSON response.
Args:
json_response (str): The JSON response to parse.
key (str): The key to extract.
Returns:
Optional[str]: The value of the key if it exists, None otherwise.
"""
response_dict = json.loads(json_response)
return response_dict.get(key)
def extract_tokens_from_text(
text: str, tokens: List[str]
) -> List[str]:
"""
Extract a list of tokens from a text response.
try:
print(f"Searching for agent with ID: {agent_id}")
for agent in agents:
if agent.id == agent_id:
print(f"Found agent with ID {agent_id}")
if task:
print(f"Running task: {task}")
return agent.run(task, *args, **kwargs)
else:
return agent
print(f"No agent found with ID {agent_id}")
return None
except Exception as e:
print(f"Error finding agent by ID: {str(e)}")
return None
def find_agent_by_name(
agent_name: str = None,
agents: List[Agent] = None,
task: str = None,
*args,
**kwargs,
) -> Agent:
"""Find agent by name
Args:
text (str): The text to parse.
tokens (List[str]): The tokens to extract.
agent_name (str): _description_
agents (List[Agent]): _description_
Returns:
List[str]: The tokens that were found in the text.
"""
return [token for token in tokens if token in text]
def detect_markdown(text: str) -> bool:
"""
Checks if a string contains Markdown code enclosed in six backticks.
Parameters
----------
text : str
The text to check.
Returns
-------
bool
True if the text contains Markdown code enclosed in six backticks, False otherwise.
Agent: _description_
"""
pattern = r"``````[\s\S]*?``````"
return bool(re.search(pattern, text))
try:
print(f"Searching for agent with name: {agent_name}")
for agent in agents:
if agent.name == agent_name:
print(f"Found agent with name {agent_name}")
if task:
print(f"Running task: {task}")
return agent.run(task, *args, **kwargs)
else:
return agent
print(f"No agent found with name {agent_name}")
return None
except Exception as e:
print(f"Error finding agent by name: {str(e)}")
return None

@ -41,7 +41,7 @@ class WorkspaceManager:
env_file_path (Path): The path to the .env file.
"""
with env_file_path.open("w") as file:
file.write("WORKSPACE_DIR=agent_workspace\n")
file.write(f"WORKSPACE_DIR={self.workspace_dir}\n")
logger.info(
"Created a new .env file with default WORKSPACE_DIR."
)
@ -57,7 +57,7 @@ class WorkspaceManager:
content = file.read()
if "WORKSPACE_DIR" not in content:
file.seek(0, os.SEEK_END)
file.write("WORKSPACE_DIR=agent_workspace\n")
file.write(f"WORKSPACE_DIR={self.workspace_dir}\n")
logger.info("Appended WORKSPACE_DIR to .env file.")
def _get_workspace_dir(
@ -150,6 +150,8 @@ class WorkspaceManager:
try:
# Check if .env file exists and create it if it doesn't
env_file_path = Path(".env")
# If the .env file doesn't exist, create it
if not env_file_path.exists():
self._create_env_file(env_file_path)
else:

@ -1,19 +1,17 @@
from swarms.telemetry.sys_info import (
from swarms.telemetry.main import (
generate_unique_identifier,
generate_user_id,
get_cpu_info,
get_machine_id,
get_os_version,
get_package_mismatches,
get_pip_version,
get_python_version,
get_ram_info,
get_swarms_verison,
system_info,
)
from swarms.telemetry.user_utils import (
generate_unique_identifier,
generate_user_id,
get_machine_id,
get_system_info,
get_user_device_data,
system_info,
)
__all__ = [

@ -1,87 +0,0 @@
import os
import platform
import socket
import psutil
import uuid
from typing import Dict
import requests
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="capture_sys_data")
def capture_system_data() -> Dict[str, str]:
"""
Captures extensive system data including platform information, user ID, IP address, CPU count,
memory information, and other system details.
Returns:
Dict[str, str]: A dictionary containing system data.
"""
try:
system_data = {
"platform": platform.system(),
"platform_version": platform.version(),
"platform_release": platform.release(),
"hostname": socket.gethostname(),
"ip_address": socket.gethostbyname(socket.gethostname()),
"cpu_count": psutil.cpu_count(logical=True),
"memory_total": f"{psutil.virtual_memory().total / (1024 ** 3):.2f} GB",
"memory_available": f"{psutil.virtual_memory().available / (1024 ** 3):.2f} GB",
"user_id": str(uuid.uuid4()), # Unique user identifier
"machine_type": platform.machine(),
"processor": platform.processor(),
"architecture": platform.architecture()[0],
}
# Get external IP address
try:
system_data["external_ip"] = requests.get(
"https://api.ipify.org"
).text
except Exception:
system_data["external_ip"] = "N/A"
return system_data
except Exception as e:
logger.error("Failed to capture system data: {}", e)
return {}
def log_agent_data(data_dict: dict) -> dict | None:
"""
Silently logs agent data to the Swarms database with retry logic.
Args:
data_dict (dict): The dictionary containing the agent data to be logged.
Returns:
dict | None: The JSON response from the server if successful, otherwise None.
"""
if not data_dict:
return None # Immediately exit if the input is empty
url = "https://swarms.world/api/get-agents/log-agents"
headers = {
"Content-Type": "application/json",
"Authorization": os.getenv("SWARMS_API_KEY"),
}
try:
response = requests.post(
url, json=data_dict, headers=headers, timeout=10
)
if (
response.ok and response.text.strip()
): # Check if response is valid and non-empty
return (
response.json()
) # Parse and return the JSON response
except (
requests.exceptions.RequestException,
requests.exceptions.JSONDecodeError,
):
pass # Fail silently without any action
return None # Return None if anything goes wrong

@ -0,0 +1,304 @@
import hashlib
import os
import platform
import socket
import subprocess
import uuid
from typing import Dict
import pkg_resources
import psutil
import requests
import toml
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="capture_sys_data")
# Helper functions
def generate_user_id():
"""Generate user id
Returns:
_type_: _description_
"""
return str(uuid.uuid4())
def get_machine_id():
"""Get machine id
Returns:
_type_: _description_
"""
raw_id = platform.node()
hashed_id = hashlib.sha256(raw_id.encode()).hexdigest()
return hashed_id
def get_system_info():
"""
Gathers basic system information.
Returns:
dict: A dictionary containing system-related information.
"""
info = {
"platform": platform.system(),
"platform_release": platform.release(),
"platform_version": platform.version(),
"architecture": platform.machine(),
"hostname": socket.gethostname(),
"ip_address": socket.gethostbyname(socket.gethostname()),
"mac_address": ":".join(
[
f"{(uuid.getnode() >> elements) & 0xFF:02x}"
for elements in range(0, 2 * 6, 8)
][::-1]
),
"processor": platform.processor(),
"python_version": platform.python_version(),
"Misc": system_info(),
}
return info
def generate_unique_identifier():
"""Generate unique identifier
Returns:
str: unique id
"""
system_info = get_system_info()
unique_id = uuid.uuid5(uuid.NAMESPACE_DNS, str(system_info))
return str(unique_id)
def get_local_ip():
"""Get local ip
Returns:
str: local ip
"""
return socket.gethostbyname(socket.gethostname())
def get_user_device_data():
data = {
"ID": generate_user_id(),
"Machine ID": get_machine_id(),
"System Info": get_system_info(),
"UniqueID": generate_unique_identifier(),
}
return data
def get_python_version():
return platform.python_version()
def get_pip_version() -> str:
"""Get pip version
Returns:
str: The version of pip installed
"""
try:
pip_version = (
subprocess.check_output(["pip", "--version"])
.decode()
.split()[1]
)
except Exception as e:
pip_version = str(e)
return pip_version
def get_swarms_verison() -> tuple[str, str]:
"""Get swarms version from both command line and package
Returns:
tuple[str, str]: A tuple containing (command line version, package version)
"""
try:
swarms_verison_cmd = (
subprocess.check_output(["swarms", "--version"])
.decode()
.split()[1]
)
except Exception as e:
swarms_verison_cmd = str(e)
swarms_verison_pkg = pkg_resources.get_distribution(
"swarms"
).version
swarms_verison = swarms_verison_cmd, swarms_verison_pkg
return swarms_verison
def get_os_version() -> str:
"""Get operating system version
Returns:
str: The operating system version and platform details
"""
return platform.platform()
def get_cpu_info() -> str:
"""Get CPU information
Returns:
str: The processor information
"""
return platform.processor()
def get_ram_info() -> str:
"""Get RAM information
Returns:
str: A formatted string containing total, used and free RAM in GB
"""
vm = psutil.virtual_memory()
used_ram_gb = vm.used / (1024**3)
free_ram_gb = vm.free / (1024**3)
total_ram_gb = vm.total / (1024**3)
return (
f"{total_ram_gb:.2f} GB, used: {used_ram_gb:.2f}, free:"
f" {free_ram_gb:.2f}"
)
def get_package_mismatches(file_path: str = "pyproject.toml") -> str:
"""Get package version mismatches between pyproject.toml and installed packages
Args:
file_path (str, optional): Path to pyproject.toml file. Defaults to "pyproject.toml".
Returns:
str: A formatted string containing package version mismatches
"""
with open(file_path) as file:
pyproject = toml.load(file)
dependencies = pyproject["tool"]["poetry"]["dependencies"]
dev_dependencies = pyproject["tool"]["poetry"]["group"]["dev"][
"dependencies"
]
dependencies.update(dev_dependencies)
installed_packages = {
pkg.key: pkg.version for pkg in pkg_resources.working_set
}
mismatches = []
for package, version_info in dependencies.items():
if isinstance(version_info, dict):
version_info = version_info["version"]
installed_version = installed_packages.get(package)
if installed_version and version_info.startswith("^"):
expected_version = version_info[1:]
if not installed_version.startswith(expected_version):
mismatches.append(
f"\t {package}: Mismatch,"
f" pyproject.toml={expected_version},"
f" pip={installed_version}"
)
else:
mismatches.append(f"\t {package}: Not found in pip list")
return "\n" + "\n".join(mismatches)
def system_info() -> dict[str, str]:
"""Get system information including Python, pip, OS, CPU and RAM details
Returns:
dict[str, str]: A dictionary containing system information
"""
return {
"Python Version": get_python_version(),
"Pip Version": get_pip_version(),
# "Swarms Version": swarms_verison,
"OS Version and Architecture": get_os_version(),
"CPU Info": get_cpu_info(),
"RAM Info": get_ram_info(),
}
def capture_system_data() -> Dict[str, str]:
"""
Captures extensive system data including platform information, user ID, IP address, CPU count,
memory information, and other system details.
Returns:
Dict[str, str]: A dictionary containing system data.
"""
try:
system_data = {
"platform": platform.system(),
"platform_version": platform.version(),
"platform_release": platform.release(),
"hostname": socket.gethostname(),
"ip_address": socket.gethostbyname(socket.gethostname()),
"cpu_count": psutil.cpu_count(logical=True),
"memory_total": f"{psutil.virtual_memory().total / (1024 ** 3):.2f} GB",
"memory_available": f"{psutil.virtual_memory().available / (1024 ** 3):.2f} GB",
"user_id": str(uuid.uuid4()), # Unique user identifier
"machine_type": platform.machine(),
"processor": platform.processor(),
"architecture": platform.architecture()[0],
}
# Get external IP address
try:
system_data["external_ip"] = requests.get(
"https://api.ipify.org"
).text
except Exception:
system_data["external_ip"] = "N/A"
return system_data
except Exception as e:
logger.error("Failed to capture system data: {}", e)
return {}
def log_agent_data(data_dict: dict) -> dict | None:
"""
Silently logs agent data to the Swarms database with retry logic.
Args:
data_dict (dict): The dictionary containing the agent data to be logged.
Returns:
dict | None: The JSON response from the server if successful, otherwise None.
"""
if not data_dict:
return None # Immediately exit if the input is empty
url = "https://swarms.world/api/get-agents/log-agents"
headers = {
"Content-Type": "application/json",
"Authorization": os.getenv("SWARMS_API_KEY"),
}
try:
response = requests.post(
url, json=data_dict, headers=headers, timeout=10
)
if (
response.ok and response.text.strip()
): # Check if response is valid and non-empty
return (
response.json()
) # Parse and return the JSON response
except (
requests.exceptions.RequestException,
requests.exceptions.JSONDecodeError,
):
pass # Fail silently without any action
return None # Return None if anything goes wrong

@ -1,138 +0,0 @@
import platform
import subprocess
import pkg_resources
import psutil
import toml
def get_python_version():
return platform.python_version()
def get_pip_version() -> str:
"""Get pip version
Returns:
str: The version of pip installed
"""
try:
pip_version = (
subprocess.check_output(["pip", "--version"])
.decode()
.split()[1]
)
except Exception as e:
pip_version = str(e)
return pip_version
def get_swarms_verison() -> tuple[str, str]:
"""Get swarms version from both command line and package
Returns:
tuple[str, str]: A tuple containing (command line version, package version)
"""
try:
swarms_verison_cmd = (
subprocess.check_output(["swarms", "--version"])
.decode()
.split()[1]
)
except Exception as e:
swarms_verison_cmd = str(e)
swarms_verison_pkg = pkg_resources.get_distribution(
"swarms"
).version
swarms_verison = swarms_verison_cmd, swarms_verison_pkg
return swarms_verison
def get_os_version() -> str:
"""Get operating system version
Returns:
str: The operating system version and platform details
"""
return platform.platform()
def get_cpu_info() -> str:
"""Get CPU information
Returns:
str: The processor information
"""
return platform.processor()
def get_ram_info() -> str:
"""Get RAM information
Returns:
str: A formatted string containing total, used and free RAM in GB
"""
vm = psutil.virtual_memory()
used_ram_gb = vm.used / (1024**3)
free_ram_gb = vm.free / (1024**3)
total_ram_gb = vm.total / (1024**3)
return (
f"{total_ram_gb:.2f} GB, used: {used_ram_gb:.2f}, free:"
f" {free_ram_gb:.2f}"
)
def get_package_mismatches(file_path: str = "pyproject.toml") -> str:
"""Get package version mismatches between pyproject.toml and installed packages
Args:
file_path (str, optional): Path to pyproject.toml file. Defaults to "pyproject.toml".
Returns:
str: A formatted string containing package version mismatches
"""
with open(file_path) as file:
pyproject = toml.load(file)
dependencies = pyproject["tool"]["poetry"]["dependencies"]
dev_dependencies = pyproject["tool"]["poetry"]["group"]["dev"][
"dependencies"
]
dependencies.update(dev_dependencies)
installed_packages = {
pkg.key: pkg.version for pkg in pkg_resources.working_set
}
mismatches = []
for package, version_info in dependencies.items():
if isinstance(version_info, dict):
version_info = version_info["version"]
installed_version = installed_packages.get(package)
if installed_version and version_info.startswith("^"):
expected_version = version_info[1:]
if not installed_version.startswith(expected_version):
mismatches.append(
f"\t {package}: Mismatch,"
f" pyproject.toml={expected_version},"
f" pip={installed_version}"
)
else:
mismatches.append(f"\t {package}: Not found in pip list")
return "\n" + "\n".join(mismatches)
def system_info() -> dict[str, str]:
"""Get system information including Python, pip, OS, CPU and RAM details
Returns:
dict[str, str]: A dictionary containing system information
"""
return {
"Python Version": get_python_version(),
"Pip Version": get_pip_version(),
# "Swarms Version": swarms_verison,
"OS Version and Architecture": get_os_version(),
"CPU Info": get_cpu_info(),
"RAM Info": get_ram_info(),
}

@ -1,86 +0,0 @@
import hashlib
import platform
import socket
import uuid
from swarms.telemetry.sys_info import system_info
# Helper functions
def generate_user_id():
"""Generate user id
Returns:
_type_: _description_
"""
return str(uuid.uuid4())
def get_machine_id():
"""Get machine id
Returns:
_type_: _description_
"""
raw_id = platform.node()
hashed_id = hashlib.sha256(raw_id.encode()).hexdigest()
return hashed_id
def get_system_info():
"""
Gathers basic system information.
Returns:
dict: A dictionary containing system-related information.
"""
info = {
"platform": platform.system(),
"platform_release": platform.release(),
"platform_version": platform.version(),
"architecture": platform.machine(),
"hostname": socket.gethostname(),
"ip_address": socket.gethostbyname(socket.gethostname()),
"mac_address": ":".join(
[
f"{(uuid.getnode() >> elements) & 0xFF:02x}"
for elements in range(0, 2 * 6, 8)
][::-1]
),
"processor": platform.processor(),
"python_version": platform.python_version(),
"Misc": system_info(),
}
return info
def generate_unique_identifier():
"""Generate unique identifier
Returns:
str: unique id
"""
system_info = get_system_info()
unique_id = uuid.uuid5(uuid.NAMESPACE_DNS, str(system_info))
return str(unique_id)
def get_local_ip():
"""Get local ip
Returns:
str: local ip
"""
return socket.gethostbyname(socket.gethostname())
def get_user_device_data():
data = {
"ID": generate_user_id(),
"Machine ID": get_machine_id(),
"System Info": get_system_info(),
"UniqueID": generate_unique_identifier(),
}
return data

@ -1,4 +1,3 @@
from swarms.utils.class_args_wrapper import print_class_parameters
from swarms.utils.data_to_text import (
csv_to_text,
data_to_text,
@ -20,7 +19,6 @@ from swarms.utils.calculate_func_metrics import profile_func
__all__ = [
"print_class_parameters",
"csv_to_text",
"data_to_text",
"json_to_text",

@ -1,106 +0,0 @@
# In order to accelerate the ops of creating files, we use the async file creation method.
import os
import asyncio
from aiofiles import open as aio_open
from typing import List
async def async_create_file(file_path: str, content: str) -> None:
"""
Asynchronously creates a file at the specified path and writes the given content to it.
Args:
file_path (str): The path where the file will be created.
content (str): The content to be written to the file.
Returns:
None
"""
async with aio_open(file_path, "w") as file:
await file.write(content)
async def create_multiple_files(
file_paths: List[str], contents: List[str]
) -> None:
"""
Asynchronously creates multiple files at the specified paths and writes the corresponding content to each file.
Args:
file_paths (List[str]): A list of paths where the files will be created.
contents (List[str]): A list of content to be written to each file, corresponding to the file paths.
Returns:
None
"""
tasks = [
async_create_file(file_path, content)
for file_path, content in zip(file_paths, contents)
]
await asyncio.gather(*tasks)
async def create_file_with_directory(
file_path: str, content: str
) -> None:
"""
Creates a file with the specified directory path and content. If the directory does not exist, it is created.
Args:
file_path (str): The path of the file to be created, including the directory.
content (str): The content to be written to the file.
Returns:
None
"""
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
await async_create_file(file_path, content)
def sync_create_file(file_path: str, content: str) -> None:
"""
Synchronously creates a file at the specified path and writes the given content to it.
Args:
file_path (str): The path where the file will be created.
content (str): The content to be written to the file.
Returns:
None
"""
asyncio.run(async_create_file(file_path, content))
def sync_create_multiple_files(
file_paths: List[str], contents: List[str]
) -> None:
"""
Synchronously creates multiple files at the specified paths and writes the corresponding content to each file.
Args:
file_paths (List[str]): A list of paths where the files will be created.
contents (List[str]): A list of content to be written to each file, corresponding to the file paths.
Returns:
None
"""
asyncio.run(create_multiple_files(file_paths, contents))
def sync_create_file_with_directory(
file_path: str, content: str
) -> None:
"""
Synchronously creates a file with the specified directory path and content. If the directory does not exist, it is created.
Args:
file_path (str): The path of the file to be created, including the directory.
content (str): The content to be written to the file.
Returns:
None
"""
asyncio.run(create_file_with_directory(file_path, content))

@ -1,36 +0,0 @@
import inspect
def print_class_parameters(cls, api_format: bool = False):
"""
Print the parameters of a class constructor.
Parameters:
cls (type): The class to inspect.
Example:
>>> print_class_parameters(Agent)
Parameter: x, Type: <class 'int'>
Parameter: y, Type: <class 'int'>
"""
try:
# Get the parameters of the class constructor
sig = inspect.signature(cls.__init__)
params = sig.parameters
if api_format:
param_dict = {}
for name, param in params.items():
if name == "self":
continue
param_dict[name] = str(param.annotation)
return param_dict
# Print the parameters
for name, param in params.items():
if name == "self":
continue
print(f"Parameter: {name}, Type: {param.annotation}")
except Exception as e:
print(f"An error occurred while inspecting the class: {e}")

@ -1,53 +0,0 @@
import concurrent.futures
from typing import List, Union
from swarms.structs.agent import Agent
def update_system_prompts(
agents: List[Union[Agent, str]],
prompt: str,
) -> List[Agent]:
"""
Update system prompts for a list of agents concurrently.
Args:
agents: List of Agent objects or strings to update
prompt: The prompt text to append to each agent's system prompt
Returns:
List of updated Agent objects
"""
if not agents:
return agents
def update_agent_prompt(agent: Union[Agent, str]) -> Agent:
# Convert string to Agent if needed
if isinstance(agent, str):
agent = Agent(
agent_name=agent,
system_prompt=prompt, # Initialize with the provided prompt
)
else:
# Preserve existing prompt and append new one
existing_prompt = (
agent.system_prompt if agent.system_prompt else ""
)
agent.system_prompt = existing_prompt + "\n" + prompt
return agent
# Use ThreadPoolExecutor for concurrent execution
max_workers = min(len(agents), 4) # Reasonable thread count
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
futures = []
for agent in agents:
future = executor.submit(update_agent_prompt, agent)
futures.append(future)
# Collect results as they complete
updated_agents = []
for future in concurrent.futures.as_completed(futures):
updated_agents.append(future.result())
return updated_agents

@ -1,6 +1,6 @@
import uuid
from swarms.telemetry.user_utils import (
from swarms.telemetry.main import (
generate_unique_identifier,
generate_user_id,
get_machine_id,

Loading…
Cancel
Save