Refactor timestamp handling and improve file naming in spreadsheet_swarm.py

pull/733/merge^2
harshalmore31 3 months ago committed by GitHub
parent 9bc2015bbd
commit 41f4011826
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,6 +1,6 @@
import asyncio
import csv
from datetime import datetime
import datetime
import os
import uuid
from typing import Dict, List, Union
@ -16,8 +16,21 @@ from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="spreadsheet_swarm")
# Replace timestamp-based time with a UUID for file naming
run_id = uuid.uuid4().hex # Unique identifier for each run
time = datetime.datetime.now().isoformat()
uuid_hex = uuid.uuid4().hex
# --------------- NEW CHANGE START ---------------
# Format time variable to be compatible across operating systems
formatted_time = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
# --------------- NEW CHANGE END ---------------
class AgentConfig(BaseModel):
"""Configuration for an agent loaded from CSV"""
agent_name: str
description: str
system_prompt: str
task: str
class AgentOutput(BaseModel):
agent_name: str
@ -25,16 +38,15 @@ class AgentOutput(BaseModel):
result: str
timestamp: str
class SwarmRunMetadata(BaseModel):
run_id: str = Field(
default_factory=lambda: f"spreadsheet_swarm_run_{run_id}"
default_factory=lambda: f"spreadsheet_swarm_run_{uuid_hex}"
)
name: str
description: str
agents: List[str]
start_time: str = Field(
default_factory=lambda: str(datetime.now().timestamp()), # Numeric timestamp
default_factory=lambda: time,
description="The start time of the swarm run.",
)
end_time: str
@ -45,7 +57,6 @@ class SwarmRunMetadata(BaseModel):
description="The number of agents participating in the swarm.",
)
class SpreadSheetSwarm(BaseSwarm):
"""
A swarm that processes tasks concurrently using multiple agents.
@ -65,7 +76,7 @@ class SpreadSheetSwarm(BaseSwarm):
def __init__(
self,
name: str = "Spreadsheet-Swarm",
description: str = "A swarm that processes tasks concurrently using multiple agents and saves the metadata to a CSV file.",
description: str = "A swarm that that processes tasks concurrently using multiple agents and saves the metadata to a CSV file.",
agents: Union[Agent, List[Agent]] = [],
autosave_on: bool = True,
save_file_path: str = None,
@ -88,19 +99,22 @@ class SpreadSheetSwarm(BaseSwarm):
self.autosave_on = autosave_on
self.max_loops = max_loops
self.workspace_dir = workspace_dir
self.load_path = load_path
self.agent_configs: Dict[str, AgentConfig] = {}
# Create a timestamp without colons or periods
timestamp = datetime.now().isoformat().replace(":", "_").replace(".", "_")
# Use this timestamp in the CSV filename
self.save_file_path = f"spreadsheet_swarm_{timestamp}_run_id_{run_id}.csv"
# --------------- NEW CHANGE START ---------------
# The save_file_path now uses the formatted_time and uuid_hex
self.save_file_path = (
f"spreadsheet_swarm_run_id_{formatted_time}.csv"
)
# --------------- NEW CHANGE END ---------------
self.metadata = SwarmRunMetadata(
run_id=f"spreadsheet_swarm_run_{run_id}",
run_id=f"spreadsheet_swarm_run_{formatted_time}",
name=name,
description=description,
agents=[agent.name for agent in agents],
start_time=str(datetime.now().timestamp()), # Numeric timestamp
start_time=time,
end_time="",
tasks_completed=0,
outputs=[],
@ -166,10 +180,22 @@ class SpreadSheetSwarm(BaseSwarm):
),
docs=[row["docs"]] if "docs" in row else "",
dynamic_temperature_enabled=True,
max_loops=row["max_loops"] if "max_loops" in row else 1,
user_name=row["user_name"] if "user_name" in row else "user",
max_loops=(
row["max_loops"]
if "max_loops" in row
else 1
),
user_name=(
row["user_name"]
if "user_name" in row
else "user"
),
# output_type="str",
stopping_token=row["stopping_token"] if "stopping_token" in row else None,
stopping_token=(
row["stopping_token"]
if "stopping_token" in row
else None
),
)
# Add agent to swarm
@ -252,8 +278,7 @@ class SpreadSheetSwarm(BaseSwarm):
print(log_agent_data(self.metadata.model_dump()))
return self.metadata.model_dump_json(indent=4)
def run(self, task: str = None, *args, **kwargs):
"""
Run the swarm with the specified task.
@ -267,30 +292,11 @@ class SpreadSheetSwarm(BaseSwarm):
str: The JSON representation of the swarm metadata.
"""
logger.info(f"Running the swarm with task: {task}")
self.metadata.start_time = str(datetime.now().timestamp()) # Numeric timestamp
# Check if we're already in an event loop
if asyncio.get_event_loop().is_running():
# If so, create and run tasks directly using `create_task` without `asyncio.run`
task_future = asyncio.create_task(self._run_tasks(task, *args, **kwargs))
asyncio.get_event_loop().run_until_complete(task_future)
else:
# If no event loop is running, run using `asyncio.run`
asyncio.run(self._run_tasks(task, *args, **kwargs))
self.metadata.end_time = str(datetime.now().timestamp()) # Numeric timestamp
# Synchronously save metadata
logger.info("Saving metadata to CSV and JSON...")
asyncio.run(self._save_metadata())
if self.autosave_on:
self.data_to_json_file()
print(log_agent_data(self.metadata.model_dump()))
return self.metadata.model_dump_json(indent=4)
try:
return asyncio.run(self._run(task, *args, **kwargs))
except Exception as e:
logger.error(f"Error running swarm: {e}")
raise e
async def _run_tasks(self, task: str, *args, **kwargs):
"""
@ -360,7 +366,7 @@ class SpreadSheetSwarm(BaseSwarm):
agent_name=agent_name,
task=task,
result=result,
timestamp=str(datetime.now().timestamp()), # Numeric timestamp
timestamp=time,
)
)
@ -381,7 +387,7 @@ class SpreadSheetSwarm(BaseSwarm):
create_file_in_folder(
folder_path=f"{self.workspace_dir}/Spreedsheet-Swarm-{self.name}/{self.name}",
file_name=f"spreedsheet-swarm-{self.metadata.run_id}_metadata.json",
file_name=f"spreedsheet-swarm-{uuid_hex}-metadata.json",
content=out,
)
@ -396,19 +402,38 @@ class SpreadSheetSwarm(BaseSwarm):
"""
Save the swarm metadata to a CSV file.
"""
logger.info(f"Saving swarm metadata to: {self.save_file_path}")
logger.info(
f"Saving swarm metadata to: {self.save_file_path}"
)
run_id = uuid.uuid4()
# Check if file exists before opening it
file_exists = os.path.exists(self.save_file_path)
async with aiofiles.open(self.save_file_path, mode="a") as file:
async with aiofiles.open(
self.save_file_path, mode="a"
) as file:
writer = csv.writer(file)
# Write header if file doesn't exist
if not file_exists:
header = "Run ID,Agent Name,Task,Result,Timestamp\n"
await file.write(header)
await writer.writerow(
[
"Run ID",
"Agent Name",
"Task",
"Result",
"Timestamp",
]
)
# Write each output as a new row
for output in self.metadata.outputs:
row = f"{run_id},{output.agent_name},{output.task},{output.result},{output.timestamp}\n"
await file.write(row)
await writer.writerow(
[
str(run_id),
output.agent_name,
output.task,
output.result,
output.timestamp,
]
)
Loading…
Cancel
Save