You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/swarms/structs/spreadsheet_swarm.py

292 lines
9.9 KiB

import asyncio
import csv
from datetime import datetime # Correct import statement
import os
import uuid
from typing import List, Union
import aiofiles
from pydantic import BaseModel, Field
from swarms.structs.agent import Agent
from swarms.structs.base_swarm import BaseSwarm
from swarms.utils.file_processing import create_file_in_folder
from swarms.telemetry.capture_sys_data import log_agent_data
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="spreadsheet_swarm")
# Corrected line
time = datetime.now().isoformat() # Use datetime.now() instead of datetime.datetime.now()
uuid_hex = uuid.uuid4().hex
# --------------- NEW CHANGE START ---------------
# Format time variable to be compatible across operating systems
formatted_time = datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
# --------------- NEW CHANGE END ---------------
class AgentOutput(BaseModel):
agent_name: str
task: str
result: str
timestamp: str
class SwarmRunMetadata(BaseModel):
run_id: str = Field(
default_factory=lambda: f"spreadsheet_swarm_run_{run_id}"
)
name: str
description: str
agents: List[str]
start_time: str = Field(
default_factory=lambda: str(datetime.now().timestamp()), # Numeric timestamp
description="The start time of the swarm run.",
)
end_time: str
tasks_completed: int
outputs: List[AgentOutput]
number_of_agents: int = Field(
...,
description="The number of agents participating in the swarm.",
)
class SpreadSheetSwarm(BaseSwarm):
"""
A swarm that processes tasks concurrently using multiple agents.
Args:
name (str, optional): The name of the swarm. Defaults to "Spreadsheet-Swarm".
description (str, optional): The description of the swarm. Defaults to "A swarm that processes tasks concurrently using multiple agents.".
agents (Union[Agent, List[Agent]], optional): The agents participating in the swarm. Defaults to an empty list.
autosave_on (bool, optional): Whether to enable autosave of swarm metadata. Defaults to True.
save_file_path (str, optional): The file path to save the swarm metadata as a CSV file. Defaults to "spreedsheet_swarm.csv".
max_loops (int, optional): The number of times to repeat the swarm tasks. Defaults to 1.
workspace_dir (str, optional): The directory path of the workspace. Defaults to the value of the "WORKSPACE_DIR" environment variable.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
"""
def __init__(
self,
name: str = "Spreadsheet-Swarm",
description: str = "A swarm that processes tasks concurrently using multiple agents and saves the metadata to a CSV file.",
agents: Union[Agent, List[Agent]] = [],
autosave_on: bool = True,
save_file_path: str = None,
max_loops: int = 1,
workspace_dir: str = os.getenv("WORKSPACE_DIR"),
*args,
**kwargs,
):
super().__init__(
name=name,
description=description,
agents=agents if isinstance(agents, list) else [agents],
*args,
**kwargs,
)
self.name = name
self.description = description
self.save_file_path = save_file_path
self.autosave_on = autosave_on
self.max_loops = max_loops
self.workspace_dir = workspace_dir
# Create a timestamp without colons or periods
timestamp = datetime.now().isoformat().replace(":", "_").replace(".", "_")
# Use this timestamp in the CSV filename
self.save_file_path = f"spreadsheet_swarm_{timestamp}_run_id_{run_id}.csv"
self.metadata = SwarmRunMetadata(
run_id=f"spreadsheet_swarm_run_{run_id}",
name=name,
description=description,
agents=[agent.name for agent in agents],
start_time=str(datetime.now().timestamp()), # Numeric timestamp
end_time="",
tasks_completed=0,
outputs=[],
number_of_agents=len(agents),
)
self.reliability_check()
def reliability_check(self):
"""
Check the reliability of the swarm.
Raises:
ValueError: If no agents are provided or no save file path is provided.
"""
logger.info("Checking the reliability of the swarm...")
if not self.agents:
raise ValueError("No agents are provided.")
if not self.save_file_path:
raise ValueError("No save file path is provided.")
if not self.max_loops:
raise ValueError("No max loops are provided.")
logger.info("Swarm reliability check passed.")
logger.info("Swarm is ready to run.")
# @profile_func
def run(self, task: str, *args, **kwargs):
"""
Run the swarm with the specified task.
Args:
task (str): The task to be executed by the swarm.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
str: The JSON representation of the swarm metadata.
"""
logger.info(f"Running the swarm with task: {task}")
self.metadata.start_time = str(datetime.now().timestamp()) # Numeric timestamp
# Check if we're already in an event loop
if asyncio.get_event_loop().is_running():
# If so, create and run tasks directly using `create_task` without `asyncio.run`
task_future = asyncio.create_task(self._run_tasks(task, *args, **kwargs))
asyncio.get_event_loop().run_until_complete(task_future)
else:
# If no event loop is running, run using `asyncio.run`
asyncio.run(self._run_tasks(task, *args, **kwargs))
self.metadata.end_time = str(datetime.now().timestamp()) # Numeric timestamp
# Synchronously save metadata
logger.info("Saving metadata to CSV and JSON...")
asyncio.run(self._save_metadata())
if self.autosave_on:
self.data_to_json_file()
print(log_agent_data(self.metadata.model_dump()))
return self.metadata.model_dump_json(indent=4)
async def _run_tasks(self, task: str, *args, **kwargs):
"""
Run the swarm tasks concurrently.
Args:
task (str): The task to be executed by the swarm.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
"""
tasks = []
for _ in range(self.max_loops):
for agent in self.agents:
# Use asyncio.to_thread to run the blocking task in a thread pool
tasks.append(
asyncio.to_thread(
self._run_agent_task,
agent,
task,
*args,
**kwargs,
)
)
# Run all tasks concurrently
results = await asyncio.gather(*tasks)
# Process the results
for result in results:
self._track_output(*result)
def _run_agent_task(self, agent, task, *args, **kwargs):
"""
Run a single agent's task in a separate thread.
Args:
agent: The agent to run the task for.
task (str): The task to be executed by the agent.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
Tuple[str, str, str]: A tuple containing the agent name, task, and result.
"""
result = agent.run(task, *args, **kwargs)
# Assuming agent.run() is a blocking call
return agent.agent_name, task, result
def _track_output(self, agent_name: str, task: str, result: str):
"""
Track the output of a completed task.
Args:
agent_name (str): The name of the agent that completed the task.
task (str): The task that was completed.
result (str): The result of the completed task.
"""
self.metadata.tasks_completed += 1
self.metadata.outputs.append(
AgentOutput(
agent_name=agent_name,
task=task,
result=result,
timestamp=str(datetime.now().timestamp()), # Numeric timestamp
)
)
def export_to_json(self):
"""
Export the swarm metadata to JSON.
Returns:
str: The JSON representation of the swarm metadata.
"""
return self.metadata.model_dump_json(indent=4)
def data_to_json_file(self):
"""
Save the swarm metadata to a JSON file.
"""
out = self.export_to_json()
create_file_in_folder(
folder_path=f"{self.workspace_dir}/Spreedsheet-Swarm-{self.name}/{self.name}",
file_name=f"spreedsheet-swarm-{self.metadata.run_id}_metadata.json",
content=out,
)
async def _save_metadata(self):
"""
Save the swarm metadata to CSV and JSON.
"""
if self.autosave_on:
await self._save_to_csv()
async def _save_to_csv(self):
"""
Save the swarm metadata to a CSV file.
"""
logger.info(f"Saving swarm metadata to: {self.save_file_path}")
run_id = uuid.uuid4()
# Check if file exists before opening it
file_exists = os.path.exists(self.save_file_path)
async with aiofiles.open(self.save_file_path, mode="a") as file:
# Write header if file doesn't exist
if not file_exists:
header = "Run ID,Agent Name,Task,Result,Timestamp\n"
await file.write(header)
# Write each output as a new row
for output in self.metadata.outputs:
row = f"{run_id},{output.agent_name},{output.task},{output.result},{output.timestamp}\n"
await file.write(row)