pull/626/head
Your Name 2 months ago
parent 92b5930414
commit c2d4756d1b

@ -123,4 +123,3 @@ if __name__ == "__main__":
- Fund operations and administration
Create a comprehensive architecture that integrates all these components into a fully automated system."""
)

@ -39,7 +39,11 @@ class Artifact(BaseModel):
versions (List[FileVersion]): The list of file versions.
edit_count (int): The number of times the file has been edited.
"""
folder_path: str = Field(default=os.getenv("WORKSPACE_DIR"), description="The path to the folder")
folder_path: str = Field(
default=os.getenv("WORKSPACE_DIR"),
description="The path to the folder",
)
file_path: str = Field(..., description="The path to the file")
file_type: str = Field(
...,
@ -247,44 +251,48 @@ class Artifact(BaseModel):
def save_as(self, output_format: str) -> None:
"""
Saves the artifact's contents in the specified format.
Args:
output_format (str): The desired output format ('.md', '.txt', '.pdf', '.py')
Raises:
ValueError: If the output format is not supported
"""
supported_formats = {'.md', '.txt', '.pdf', '.py'}
supported_formats = {".md", ".txt", ".pdf", ".py"}
if output_format not in supported_formats:
raise ValueError(f"Unsupported output format. Supported formats are: {supported_formats}")
output_path = os.path.splitext(self.file_path)[0] + output_format
if output_format == '.pdf':
raise ValueError(
f"Unsupported output format. Supported formats are: {supported_formats}"
)
output_path = (
os.path.splitext(self.file_path)[0] + output_format
)
if output_format == ".pdf":
self._save_as_pdf(output_path)
else:
with open(output_path, 'w', encoding='utf-8') as f:
if output_format == '.md':
with open(output_path, "w", encoding="utf-8"):
if output_format == ".md":
# Create the file in the specified folder
create_file_in_folder(
self.folder_path,
self.file_path,
f"{os.path.basename(self.file_path)}\n\n{self.contents}"
f"{os.path.basename(self.file_path)}\n\n{self.contents}",
)
elif output_format == '.py':
# Add Python file header
elif output_format == ".py":
# Add Python file header
create_file_in_folder(
self.folder_path,
self.file_path,
f"#{os.path.basename(self.file_path)}\n\n{self.contents}"
f"#{os.path.basename(self.file_path)}\n\n{self.contents}",
)
else: # .txt
create_file_in_folder(
self.folder_path,
self.file_path,
self.contents
self.contents,
)
def _save_as_pdf(self, output_path: str) -> None:
@ -294,11 +302,11 @@ class Artifact(BaseModel):
try:
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter
c = canvas.Canvas(output_path, pagesize=letter)
# Split content into lines
y = 750 # Starting y position
for line in self.contents.split('\n'):
for line in self.contents.split("\n"):
c.drawString(50, y, line)
y -= 15 # Move down for next line
if y < 50: # New page if bottom reached
@ -306,7 +314,9 @@ class Artifact(BaseModel):
y = 750
c.save()
except ImportError:
raise ImportError("reportlab package is required for PDF output. Install with: pip install reportlab")
raise ImportError(
"reportlab package is required for PDF output. Install with: pip install reportlab"
)
# # Example usage
@ -326,13 +336,13 @@ class Artifact(BaseModel):
# print(artifact.get_metrics())
# Testing saving in different artifact types
# Testing saving in different artifact types
# Create an artifact
#artifact = Artifact(file_path="/path/to/file", file_type=".txt",contents="", edit_count=0 )
#artifact.create("This is some content\nWith multiple lines")
# artifact = Artifact(file_path="/path/to/file", file_type=".txt",contents="", edit_count=0 )
# artifact.create("This is some content\nWith multiple lines")
# Save in different formats
#artifact.save_as(".md") # Creates example.md
#artifact.save_as(".txt") # Creates example.txt
#artifact.save_as(".pdf") # Creates example.pdf
#artifact.save_as(".py") # Creates example.py
# artifact.save_as(".md") # Creates example.md
# artifact.save_as(".txt") # Creates example.txt
# artifact.save_as(".pdf") # Creates example.pdf
# artifact.save_as(".py") # Creates example.py

@ -54,6 +54,7 @@ from swarms.utils.file_processing import create_file_in_folder
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.artifacts.main_artifact import Artifact
# Utils
# Custom stopping condition
def stop_when_repeats(response: str) -> bool:
@ -175,7 +176,7 @@ class Agent:
timeout (int): The timeout
artifacts_on (bool): Enable artifacts
artifacts_output_path (str): The artifacts output path
artifacts_file_extension (str): The artifacts file extension
artifacts_file_extension (str): The artifacts file extension (.pdf, .md, .txt, )
Methods:
run: Run the agent
@ -208,7 +209,7 @@ class Agent:
run_async_concurrent: Run the agent asynchronously and concurrently
construct_dynamic_prompt: Construct the dynamic prompt
handle_artifacts: Handle artifacts
Examples:
>>> from swarm_models import OpenAIChat
@ -575,8 +576,6 @@ class Agent:
# Telemetry Processor to log agent data
threading.Thread(target=self.log_agent_data).start()
def check_if_no_prompt_then_autogenerate(self, task: str = None):
"""
@ -982,10 +981,14 @@ class Agent:
self.short_memory.get_str()
)
)
# Handle artifacts
if self.artifacts_on is True:
self.handle_artifacts(concat_strings(all_responses), self.artifacts_output_path, self.artifacts_file_extension)
self.handle_artifacts(
concat_strings(all_responses),
self.artifacts_output_path,
self.artifacts_file_extension,
)
# More flexible output types
if (
@ -2294,31 +2297,40 @@ class Agent:
except Exception as e:
logger.error(f"An error occurred during execution: {e}")
raise e
def handle_artifacts(self, text: str, file_output_path: str, file_extension: str) -> None:
def handle_artifacts(
self, text: str, file_output_path: str, file_extension: str
) -> None:
"""Handle creating and saving artifacts with error handling."""
try:
logger.info(f"Creating artifact for file: {file_output_path}")
logger.info(
f"Creating artifact for file: {file_output_path}"
)
artifact = Artifact(
file_path=file_output_path,
file_type=file_extension,
contents=text,
edit_count=0,
)
logger.info(f"Saving artifact with extension: {file_extension}")
logger.info(
f"Saving artifact with extension: {file_extension}"
)
artifact.save_as(file_extension)
logger.success(f"Successfully saved artifact to {file_output_path}")
logger.success(
f"Successfully saved artifact to {file_output_path}"
)
except ValueError as e:
logger.error(f"Invalid input values for artifact: {str(e)}")
logger.error(
f"Invalid input values for artifact: {str(e)}"
)
raise
except IOError as e:
logger.error(f"Error saving artifact to file: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error handling artifact: {str(e)}")
logger.error(
f"Unexpected error handling artifact: {str(e)}"
)
raise

@ -9,8 +9,7 @@ from multiprocessing import cpu_count
from swarms.structs.agent import Agent
from swarms.utils.calculate_func_metrics import profile_func
import sys
# Type definitions
AgentType = Union[Agent, Callable]

@ -5,7 +5,7 @@ import uuid
from loguru import logger
from typing import Dict
import requests
import time
def capture_system_data() -> Dict[str, str]:
"""
@ -33,7 +33,9 @@ def capture_system_data() -> Dict[str, str]:
# Get external IP address
try:
system_data["external_ip"] = requests.get("https://api.ipify.org").text
system_data["external_ip"] = requests.get(
"https://api.ipify.org"
).text
except Exception as e:
logger.warning("Failed to retrieve external IP: {}", e)
system_data["external_ip"] = "N/A"
@ -44,8 +46,9 @@ def capture_system_data() -> Dict[str, str]:
return {}
def log_agent_data(data_dict: dict, retry_attempts: int = 1) -> dict | None:
def log_agent_data(
data_dict: dict, retry_attempts: int = 1
) -> dict | None:
"""
Logs agent data to the Swarms database with retry logic.
@ -55,7 +58,7 @@ def log_agent_data(data_dict: dict, retry_attempts: int = 1) -> dict | None:
Returns:
dict | None: The JSON response from the server if successful, otherwise None.
Raises:
ValueError: If data_dict is empty or invalid
requests.exceptions.RequestException: If API request fails after all retries
@ -71,20 +74,22 @@ def log_agent_data(data_dict: dict, retry_attempts: int = 1) -> dict | None:
}
try:
response = requests.post(url, json=data_dict, headers=headers, timeout=10)
response = requests.post(
url, json=data_dict, headers=headers, timeout=10
)
response.raise_for_status()
result = response.json()
return result
except requests.exceptions.Timeout:
logger.warning("Request timed out")
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error occurred: {e}")
if response.status_code == 401:
logger.error("Authentication failed - check API key")
except requests.exceptions.RequestException as e:
logger.error(f"Error logging agent data: {e}")

@ -2,25 +2,27 @@ import unittest
import os
from unittest.mock import patch, mock_open
import tempfile
import sys
from pathlib import Path
from datetime import datetime
import json
from swarms.artifacts.main_artifact import Artifact
class TestArtifactSaveAs(unittest.TestCase):
def setUp(self):
"""Set up test fixtures before each test method."""
self.temp_dir = tempfile.mkdtemp()
self.test_file_path = os.path.join(self.temp_dir, "test_file.txt")
self.test_content = "This is test content\nWith multiple lines"
self.test_file_path = os.path.join(
self.temp_dir, "test_file.txt"
)
self.test_content = (
"This is test content\nWith multiple lines"
)
# Create artifact with all required fields
self.artifact = Artifact(
file_path=self.test_file_path,
file_type=".txt",
contents=self.test_content, # Provide initial content
edit_count=0
edit_count=0,
)
self.artifact.create(self.test_content)
@ -31,7 +33,7 @@ class TestArtifactSaveAs(unittest.TestCase):
os.remove(self.test_file_path)
# Clean up any potential output files
base_path = os.path.splitext(self.test_file_path)[0]
for ext in ['.md', '.txt', '.py', '.pdf']:
for ext in [".md", ".txt", ".py", ".pdf"]:
output_file = base_path + ext
if os.path.exists(output_file):
os.remove(output_file)
@ -41,68 +43,73 @@ class TestArtifactSaveAs(unittest.TestCase):
def test_save_as_txt(self):
"""Test saving artifact as .txt file"""
output_path = os.path.splitext(self.test_file_path)[0] + '.txt'
self.artifact.save_as('.txt')
output_path = (
os.path.splitext(self.test_file_path)[0] + ".txt"
)
self.artifact.save_as(".txt")
self.assertTrue(os.path.exists(output_path))
with open(output_path, 'r', encoding='utf-8') as f:
with open(output_path, "r", encoding="utf-8") as f:
content = f.read()
self.assertEqual(content, self.test_content)
def test_save_as_markdown(self):
"""Test saving artifact as .md file"""
output_path = os.path.splitext(self.test_file_path)[0] + '.md'
self.artifact.save_as('.md')
output_path = os.path.splitext(self.test_file_path)[0] + ".md"
self.artifact.save_as(".md")
self.assertTrue(os.path.exists(output_path))
with open(output_path, 'r', encoding='utf-8') as f:
with open(output_path, "r", encoding="utf-8") as f:
content = f.read()
self.assertIn(self.test_content, content)
self.assertIn('# test_file.txt', content)
self.assertIn("# test_file.txt", content)
def test_save_as_python(self):
"""Test saving artifact as .py file"""
output_path = os.path.splitext(self.test_file_path)[0] + '.py'
self.artifact.save_as('.py')
output_path = os.path.splitext(self.test_file_path)[0] + ".py"
self.artifact.save_as(".py")
self.assertTrue(os.path.exists(output_path))
with open(output_path, 'r', encoding='utf-8') as f:
with open(output_path, "r", encoding="utf-8") as f:
content = f.read()
self.assertIn(self.test_content, content)
self.assertIn('"""', content)
self.assertIn('Generated Python file', content)
self.assertIn("Generated Python file", content)
@patch('builtins.open', new_callable=mock_open)
@patch("builtins.open", new_callable=mock_open)
def test_file_writing_called(self, mock_file):
"""Test that file writing is actually called"""
self.artifact.save_as('.txt')
self.artifact.save_as(".txt")
mock_file.assert_called()
mock_file().write.assert_called_with(self.test_content)
def test_invalid_format(self):
"""Test saving artifact with invalid format"""
with self.assertRaises(ValueError):
self.artifact.save_as('.invalid')
self.artifact.save_as(".invalid")
def test_export_import_json(self):
"""Test exporting and importing JSON format"""
json_path = os.path.join(self.temp_dir, "test.json")
# Export to JSON
self.artifact.export_to_json(json_path)
self.assertTrue(os.path.exists(json_path))
# Import from JSON and convert timestamp back to string
with open(json_path, 'r') as f:
with open(json_path, "r") as f:
data = json.loads(f.read())
# Ensure timestamps are strings
for version in data.get('versions', []):
if isinstance(version.get('timestamp'), str):
version['timestamp'] = version['timestamp']
for version in data.get("versions", []):
if isinstance(version.get("timestamp"), str):
version["timestamp"] = version["timestamp"]
# Import the modified data
imported_artifact = Artifact(**data)
self.assertEqual(imported_artifact.contents, self.test_content)
self.assertEqual(
imported_artifact.contents, self.test_content
)
# Cleanup
os.remove(json_path)
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()

@ -97,26 +97,35 @@ def execute_generated_code(code: str) -> Any:
try:
exec_namespace = {}
exec(code, exec_namespace)
# Check for any callable functions in the namespace
main_function = None
for item in exec_namespace.values():
if callable(item) and not item.__name__.startswith('__'):
if callable(item) and not item.__name__.startswith("__"):
main_function = item
break
if main_function:
result = main_function()
logger.info(f"Code execution successful. Function result: {result}")
logger.info(
f"Code execution successful. Function result: {result}"
)
return result
elif "result" in exec_namespace:
logger.info(f"Code execution successful. Result variable: {exec_namespace['result']}")
return exec_namespace['result']
logger.info(
f"Code execution successful. Result variable: {exec_namespace['result']}"
)
return exec_namespace["result"]
else:
logger.warning("Code execution completed but no result found")
logger.warning(
"Code execution completed but no result found"
)
return "No result or function found in executed code."
except Exception as e:
logger.error(f"Code execution failed with error: {str(e)}", exc_info=True)
logger.error(
f"Code execution failed with error: {str(e)}",
exc_info=True,
)
return e
@ -139,17 +148,22 @@ def retry_until_success(task: str, max_retries: int = 5):
logger.info(f"Result: {result}")
if isinstance(result, Exception):
logger.error(f"Attempt {attempts + 1} failed: {str(result)}")
logger.error(
f"Attempt {attempts + 1} failed: {str(result)}"
)
print("Retrying with updated code...")
attempts += 1
else:
logger.info(f"Success on attempt {attempts + 1}. Result: {result}")
logger.info(
f"Success on attempt {attempts + 1}. Result: {result}"
)
print(f"Code executed successfully: {result}")
break
else:
logger.error("Max retries reached. Execution failed.")
print("Max retries reached. Execution failed.")
# Usage
retry_until_success(
"Write a function to fetch and display weather information from a given API."

Loading…
Cancel
Save