[REFERENCE}

pull/584/head
Your Name 4 months ago
parent b097821edd
commit b446eca147

@ -92,7 +92,7 @@ api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class # Create an instance of the OpenAIChat class
model = OpenAIChat( model = OpenAIChat(
api_key=api_key, model_name="gpt-4o-mini", temperature=0.1 openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
) )
# Initialize the agent # Initialize the agent

@ -166,9 +166,10 @@ nav:
- Workflows: - Workflows:
- ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md" - ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md"
- SequentialWorkflow: "swarms/structs/sequential_workflow.md" - SequentialWorkflow: "swarms/structs/sequential_workflow.md"
- Structs: # - Structs:
- Conversation: "swarms/structs/conversation.md" # - Conversation: "swarms/structs/conversation.md"
- Task: "swarms/structs/task.md" # - Task: "swarms/structs/task.md"
- Full API Reference: "swarms/framework/reference.md"
- Contributing: - Contributing:
- Contributing: "swarms/contributing.md" - Contributing: "swarms/contributing.md"
- Tests: "swarms/framework/test.md" - Tests: "swarms/framework/test.md"

File diff suppressed because it is too large Load Diff

@ -35,6 +35,7 @@ agent = Agent(
context_length=200000, context_length=200000,
return_step_meta=False, return_step_meta=False,
# output_type="json", # output_type="json",
output_type=str,
) )

@ -1,126 +0,0 @@
import uuid
import time
from typing import List, Literal, Optional, Union
from pydantic import BaseModel, Field
class ModelCard(BaseModel):
"""
A Pydantic model representing a model card, which provides metadata about a machine learning model.
It includes fields like model ID, owner, and creation time.
"""
id: str
object: str = "model"
created: int = Field(default_factory=lambda: int(time.time()))
owned_by: str = "owner"
root: Optional[str] = None
parent: Optional[str] = None
permission: Optional[list] = None
class ModelList(BaseModel):
object: str = "list"
data: List[ModelCard] = []
class ImageUrl(BaseModel):
url: str
class TextContent(BaseModel):
type: Literal["text"]
text: str
class ImageUrlContent(BaseModel):
type: Literal["image_url"]
image_url: ImageUrl
ContentItem = Union[TextContent, ImageUrlContent]
class ChatMessageInput(BaseModel):
role: str = Field(
...,
description="The role of the message sender. Could be 'user', 'assistant', or 'system'.",
)
content: Union[str, List[ContentItem]]
class ChatMessageResponse(BaseModel):
role: str = Field(
...,
description="The role of the message sender. Could be 'user', 'assistant', or 'system'.",
)
content: str = None
class DeltaMessage(BaseModel):
role: Optional[Literal["user", "assistant", "system"]] = None
content: Optional[str] = None
class ChatCompletionRequest(BaseModel):
model: str = "gpt-4o"
messages: List[ChatMessageInput]
temperature: Optional[float] = 0.8
top_p: Optional[float] = 0.8
max_tokens: Optional[int] = 4000
stream: Optional[bool] = False
repetition_penalty: Optional[float] = 1.0
echo: Optional[bool] = False
class ChatCompletionResponseChoice(BaseModel):
index: int = Field(..., description="The index of the choice.")
input: str = Field(..., description="The input message.")
message: ChatMessageResponse = Field(
..., description="The output message."
)
class ChatCompletionResponseStreamChoice(BaseModel):
index: int
delta: DeltaMessage
class UsageInfo(BaseModel):
prompt_tokens: int = 0
total_tokens: int = 0
completion_tokens: Optional[int] = 0
class ChatCompletionResponse(BaseModel):
model: str
object: Literal["chat.completion", "chat.completion.chunk"]
choices: List[
Union[
ChatCompletionResponseChoice,
ChatCompletionResponseStreamChoice,
]
]
created: Optional[int] = Field(
default_factory=lambda: int(time.time())
)
class AgentChatCompletionResponse(BaseModel):
id: Optional[str] = Field(
f"agent-{uuid.uuid4().hex}",
description="The ID of the agent that generated the completion response.",
)
agent_name: Optional[str] = Field(
...,
description="The name of the agent that generated the completion response.",
)
object: Optional[
Literal["chat.completion", "chat.completion.chunk"]
] = None
choices: Optional[ChatCompletionResponseChoice] = None
created: Optional[int] = Field(
default_factory=lambda: int(time.time())
)
# full_usage: Optional[UsageInfo]

@ -1,42 +0,0 @@
# In order to accelerate the ops of creating files, we use the async file creation method.
import os
import asyncio
from aiofiles import open as aio_open
from typing import List
async def async_create_file(file_path: str, content: str) -> None:
async with aio_open(file_path, "w") as file:
await file.write(content)
async def create_multiple_files(
file_paths: List[str], contents: List[str]
) -> None:
tasks = [
async_create_file(
(file_path, content)
for file_path, content in zip(file_paths, contents)
)
]
await asyncio.gather(*tasks)
async def create_file_with_directory(
file_path: str, content: str
) -> None:
"""
Creates a file with the specified directory path and content.
Args:
file_path (str): The path of the file to be created.
content (str): The content to be written to the file.
Returns:
None
"""
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
await async_create_file(file_path, content)

@ -1,127 +0,0 @@
import time
from os import cpu_count
from typing import Any, Callable, List, Optional
from loguru import logger
from pathos.multiprocessing import ProcessingPool as Pool
from typing import Tuple
def execute_parallel_optimized(
callables_with_args: List[
Tuple[Callable[..., Any], Tuple[Any, ...]]
],
max_workers: Optional[int] = None,
chunk_size: Optional[int] = None,
retries: int = 3,
**kwargs,
) -> List[Any]:
"""
Executes a list of callables in parallel, leveraging all available CPU cores.
This function is optimized for high performance and reliability.
Args:
callables_with_args (List[Tuple[Callable[..., Any], Tuple[Any, ...]]]):
A list of tuples, where each tuple contains a callable and a tuple of its arguments.
max_workers (Optional[int]): The maximum number of workers to use. Defaults to the number of available cores.
chunk_size (Optional[int]): The size of chunks to split the tasks into for balanced execution. Defaults to automatic chunking.
retries (int): Number of retries for a failed task. Default is 3.
Returns:
List[Any]: A list of results from each callable. The order corresponds to the order of the input list.
Raises:
Exception: Any exception raised by the callable will be logged and re-raised after retries are exhausted.
"""
max_workers = cpu_count() if max_workers is None else max_workers
results = []
logger.info(
f"Starting optimized parallel execution of {len(callables_with_args)} tasks."
)
pool = Pool(
nodes=max_workers, **kwargs
) # Initialize the pool once
def _execute_with_retry(callable_, args, retries):
attempt = 0
while attempt < retries:
try:
result = callable_(*args)
logger.info(
f"Task {callable_} with args {args} completed successfully."
)
return result
except Exception as e:
attempt += 1
logger.warning(
f"Task {callable_} with args {args} failed on attempt {attempt}: {e}"
)
time.sleep(1) # Small delay before retrying
if attempt >= retries:
logger.error(
f"Task {callable_} with args {args} failed after {retries} retries."
)
raise
try:
if chunk_size is None:
chunk_size = (
len(callables_with_args)
// (max_workers or pool.ncpus)
or 1
)
# Use chunking and mapping for efficient execution
results = pool.map(
lambda item: _execute_with_retry(
item[0], item[1], retries
),
callables_with_args,
chunksize=chunk_size,
)
except Exception as e:
logger.critical(
f"Parallel execution failed due to an error: {e}"
)
raise
logger.info(
f"Optimized parallel execution completed. {len(results)} tasks executed."
)
pool.close() # Ensure pool is properly closed
pool.join()
# return results
# def add(a, b):
# return a + b
# def multiply(a, b):
# return a * b
# def power(a, b):
# return a**b
# # if __name__ == "__main__":
# # # List of callables with their respective arguments
# # callables_with_args = [
# # (add, (2, 3)),
# # (multiply, (5, 4)),
# # (power, (2, 10)),
# # ]
# # # Execute the callables in parallel
# # results = execute_parallel_optimized(callables_with_args)
# # # Print the results
# # print("Results:", results)

@ -1,82 +0,0 @@
import subprocess
from typing import Any, Dict, List
from loguru import logger
from pydantic import BaseModel
from swarms.structs.agent import Agent
try:
import pandas as pd
except ImportError:
logger.error("Failed to import pandas")
subprocess.run(["pip", "install", "pandas"])
import pandas as pd
def display_agents_info(agents: List[Agent]) -> None:
"""
Displays information about all agents in a list using a DataFrame.
:param agents: List of Agent instances.
"""
# Extracting relevant information from each agent
agent_data = []
for agent in agents:
try:
agent_info = {
"ID": agent.id,
"Name": agent.agent_name,
"Description": agent.description,
"max_loops": agent.max_loops,
# "Docs": agent.docs,
"System Prompt": agent.system_prompt,
"LLM Model": agent.llm.model_name, # type: ignore
}
agent_data.append(agent_info)
except AttributeError as e:
logger.error(
f"Failed to extract information from agent {agent}: {e}"
)
continue
# Creating a DataFrame to display the data
try:
df = pd.DataFrame(agent_data)
except Exception as e:
logger.error(f"Failed to create DataFrame: {e}")
return
# Displaying the DataFrame
try:
print(df)
except Exception as e:
logger.error(f"Failed to print DataFrame: {e}")
def dict_to_dataframe(data: Dict[str, Any]) -> pd.DataFrame:
"""
Converts a dictionary into a pandas DataFrame.
:param data: Dictionary to convert.
:return: A pandas DataFrame representation of the dictionary.
"""
# Convert dictionary to DataFrame
df = pd.json_normalize(data)
return df
def pydantic_model_to_dataframe(model: BaseModel) -> pd.DataFrame:
"""
Converts a Pydantic Base Model into a pandas DataFrame.
:param model: Pydantic Base Model to convert.
:return: A pandas DataFrame representation of the Pydantic model.
"""
# Convert Pydantic model to dictionary
model_dict = model.dict()
# Convert dictionary to DataFrame
df = dict_to_dataframe(model_dict)
return df

@ -1,98 +0,0 @@
from functools import wraps
from loguru import logger
import tracemalloc
import psutil
import time
from typing import Callable, Any
def profile_all(func: Callable) -> Callable:
"""
A decorator to profile memory usage, CPU usage, and I/O operations
of a function and log the data using loguru.
It combines tracemalloc for memory profiling, psutil for CPU and I/O operations,
and measures execution time.
Args:
func (Callable): The function to be profiled.
Returns:
Callable: The wrapped function with profiling enabled.
"""
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# Start memory tracking
tracemalloc.start()
# Get initial CPU stats
process = psutil.Process()
initial_cpu_times = process.cpu_times()
# Get initial I/O stats if available
try:
initial_io_counters = process.io_counters()
io_tracking_available = True
except AttributeError:
logger.warning(
"I/O counters not available on this platform."
)
io_tracking_available = False
# Start timing the function execution
start_time = time.time()
# Execute the function
result = func(*args, **kwargs)
# Stop timing
end_time = time.time()
execution_time = end_time - start_time
# Get final CPU stats
final_cpu_times = process.cpu_times()
# Get final I/O stats if available
if io_tracking_available:
final_io_counters = process.io_counters()
io_read_count = (
final_io_counters.read_count
- initial_io_counters.read_count
)
io_write_count = (
final_io_counters.write_count
- initial_io_counters.write_count
)
else:
io_read_count = io_write_count = 0
# Get memory usage statistics
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics("lineno")
# Calculate CPU usage
cpu_usage = (
final_cpu_times.user
- initial_cpu_times.user
+ final_cpu_times.system
- initial_cpu_times.system
)
# Log the data
logger.info(f"Execution time: {execution_time:.4f} seconds")
logger.info(f"CPU usage: {cpu_usage:.2f} seconds")
if io_tracking_available:
logger.info(
f"I/O Operations - Read: {io_read_count}, Write: {io_write_count}"
)
logger.info("Top memory usage:")
for stat in top_stats[:10]:
logger.info(stat)
# Stop memory tracking
tracemalloc.stop()
return result
return wrapper

@ -0,0 +1,363 @@
import os
from typing import List
from dotenv import load_dotenv
from loguru import logger
from swarm_models import OpenAIChat
from swarms import Agent
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Set
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
SYS_PROMPT = """
### System Prompt for API Reference Documentation Generator
You are an expert documentation generator agent. Your task is to produce **high-quality Python API reference documentation** for functions and classes in Python codebases. The codebase does **not include any web APIs**, only Python functions, methods, classes, and constants. You will generate clear, concise, and professional documentation based on the structure and functionality of the given code.
Don't use the one hashtag for the title, only use 3 hashtags for the module path
**Instructions:**
1. **Documentation Style**: Follow a consistent format for documenting Python functions and classes.
- For functions, provide:
- **Name** of the function.
- **Description** of what the function does.
- **Parameters** with type annotations and a description for each parameter.
- **Return Type** and a description of what is returned.
- **Example Usage** in code block format.
- For classes, provide:
- **Name** of the class.
- **Description** of the class and its purpose.
- **Attributes** with a description of each attribute and its type.
- **Methods** with the same details as functions (description, parameters, return types).
- **Example Usage** in code block format.
- For constants, briefly describe their purpose and value.
2. **Many-shot examples**:
- Provide multiple examples of documenting both **functions** and **classes** based on the given code.
### Many-Shot Examples:
#### Example 1: Function Documentation
```python
def add_numbers(a: int, b: int) -> int:
return a + b
```
**Documentation:**
### `add_numbers(a: int, b: int) -> int`
**Description**:
Adds two integers and returns their sum.
**Parameters**:
- `a` (`int`): The first integer.
- `b` (`int`): The second integer.
**Return**:
- (`int`): The sum of the two input integers.
**Example**:
```python
result = add_numbers(3, 5)
print(result) # Output: 8
```
#### Example 2: Function Documentation
```python
def greet_user(name: str) -> str:
return f"Hello, {name}!"
```
**Documentation:**
### `greet_user(name: str) -> str`
**Description**:
Returns a greeting message for the given user.
**Parameters**:
- `name` (`str`): The name of the user to greet.
**Return**:
- (`str`): A personalized greeting message.
**Example**:
```python
message = greet_user("Alice")
print(message) # Output: "Hello, Alice!"
```
#### Example 3: Class Documentation
```python
class Calculator:
def __init__(self):
self.result = 0
def add(self, value: int) -> None:
self.result += value
def reset(self) -> None:
self.result = 0
```
**Documentation:**
### `Calculator`
**Description**:
A simple calculator class that can add numbers and reset the result.
**Attributes**:
- `result` (`int`): The current result of the calculator, initialized to 0.
**Methods**:
- `add(value: int) -> None`
- **Description**: Adds the given value to the current result.
- **Parameters**:
- `value` (`int`): The value to add to the result.
- **Return**: None.
- `reset() -> None`
- **Description**: Resets the calculator result to 0.
- **Parameters**: None.
- **Return**: None.
**Example**:
```python
calc = Calculator()
calc.add(5)
print(calc.result) # Output: 5
calc.reset()
print(calc.result) # Output: 0
```
#### Example 4: Constant Documentation
```python
PI = 3.14159
```
**Documentation:**
### `PI`
**Description**:
A constant representing the value of pi (π) to 5 decimal places.
**Value**:
`3.14159`
"""
class DocumentationAgent:
def __init__(
self,
directory: str,
output_file: str = "API_Reference.md",
agent_name: str = "Documentation-Generator",
):
"""
Initializes the DocumentationAgent.
:param directory: The root directory where the Python files are located.
:param output_file: The file where all the documentation will be saved.
:param agent_name: Name of the agent generating the documentation.
"""
self.directory = directory
self.output_file = output_file
self.agent_name = agent_name
self.model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4o-mini",
temperature=0.1,
max_tokens=3000,
)
self.agent = Agent(
agent_name=agent_name,
system_prompt=SYS_PROMPT,
llm=self.model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path=f"{agent_name}_state.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
output_type=str,
)
self.documented_files: Set[str] = (
set()
) # Memory system to store documented files
logger.info(
f"Initialized {self.agent_name} for generating API documentation."
)
# Ensure the output file is clean before starting
with open(self.output_file, "w") as f:
f.write("# API Reference Documentation\n\n")
logger.info(f"Created new output file: {self.output_file}")
def _get_python_files(self) -> List[str]:
"""
Gets all Python (.py) files in the given directory, excluding 'utils', 'tools', and 'prompts' directories.
:return: A list of full paths to Python files.
"""
excluded_folders = {
"utils",
"tools",
"prompts",
"cli",
"schemas",
"agents",
"artifacts",
}
python_files = []
for root, dirs, files in os.walk(self.directory):
# Remove excluded folders from the search
dirs[:] = [d for d in dirs if d not in excluded_folders]
for file in files:
if file.endswith(".py"):
full_path = os.path.join(root, file)
python_files.append(full_path)
logger.info(f"Found Python file: {full_path}")
return python_files
def _get_module_path(self, file_path: str) -> str:
"""
Converts a file path to a Python module path.
:param file_path: Full path to the Python file.
:return: The module path for the file.
"""
relative_path = os.path.relpath(file_path, self.directory)
module_path = relative_path.replace(os.sep, ".").replace(
".py", ""
)
logger.info(f"Formatted module path: {module_path}")
return module_path
def _read_file_content(self, file_path: str) -> str:
"""
Reads the content of a Python file.
:param file_path: Full path to the Python file.
:return: The content of the file as a string.
"""
try:
with open(file_path, "r") as f:
content = f.read()
logger.info(f"Read content from {file_path}")
return content
except Exception as e:
logger.error(f"Error reading file {file_path}: {e}")
return ""
def _write_to_markdown(self, content: str) -> None:
"""
Appends generated content to the output markdown file.
:param content: Documentation content to write to the markdown file.
"""
try:
with open(self.output_file, "a") as f:
f.write(content)
f.write(
"\n\n"
) # Add space between different module documentations
logger.info(
f"Appended documentation to {self.output_file}"
)
except Exception as e:
logger.error(f"Error writing to {self.output_file}: {e}")
def _generate_doc_for_file(self, file_path: str) -> None:
"""
Generates documentation for a single Python file.
:param file_path: The full path to the Python file.
"""
if file_path in self.documented_files:
logger.info(
f"Skipping already documented file: {file_path}"
)
return
module_path = self._get_module_path(file_path)
file_content = self._read_file_content(file_path)
if (
file_content.strip()
): # Ensure the file isn't empty or just whitespace
logger.info(
f"Generating documentation for module {module_path}..."
)
# Updated task prompt to give clearer instructions
task_prompt = f"""
You are an expert documentation generator. Generate a comprehensive Python API reference documentation for the following module '{module_path}'.
The module contains the following code:
{file_content}
Provide full documentation including descriptions for all functions, classes, and methods. If there is nothing to document, simply write "No documentable code".
Make sure you provide full module imports in your documentation such as {self.directory}.{module_path}
from {self.directory}.{module_path} import *
### `{self.directory}.{module_path}`
"""
doc_output = self.agent.run(task_prompt)
# Add a section for the subfolder (if any)
# markdown_content = f"# {subfolder}\n\n" if subfolder else ""
markdown_content = f"\n\n{doc_output}\n"
self._write_to_markdown(markdown_content)
self.documented_files.add(file_path)
def run(self) -> None:
"""
Generates documentation for all Python files in the directory and writes it to a markdown file using multithreading.
"""
python_files = self._get_python_files()
# with ThreadPoolExecutor() as executor:
# futures = [executor.submit(self._generate_doc_for_file, file_path) for file_path in python_files]
# for future in as_completed(futures):
# try:
# future.result() # Raises an exception if the function failed
# except Exception as e:
# logger.error(f"Error processing a file: {e}")
for file in python_files:
self._generate_doc_for_file(file)
logger.info(
f"Documentation generation completed. All documentation written to {self.output_file}"
)
# Example usage
if __name__ == "__main__":
doc_agent = DocumentationAgent(directory="swarms")
doc_agent.run()
Loading…
Cancel
Save