[Agent][Docs]

pull/1043/head
Kye Gomez 2 weeks ago
parent 5a2ea821d6
commit 7f6ba0eb73

@ -1,26 +1,4 @@
# `Agent`
Swarm Agent is a powerful autonomous agent framework designed to connect Language Models (LLMs) with various tools and long-term memory. This class provides the ability to ingest and process various types of documents such as PDFs, text files, Markdown files, JSON files, and more. The Agent structure offers a wide range of features to enhance the capabilities of LLMs and facilitate efficient task execution.
## Overview
The `Agent` class establishes a conversational loop with a language model, allowing for interactive task execution, feedback collection, and dynamic response generation. It includes features such as:
1. **Conversational Loop**: Enables back-and-forth interaction with the model.
2. **Feedback Collection**: Allows users to provide feedback on generated responses.
3. **Stoppable Conversation**: Supports custom stopping conditions for the conversation.
4. **Retry Mechanism**: Implements a retry system for handling issues in response generation.
5. **Tool Integration**: Supports the integration of various tools for enhanced capabilities.
6. **Long-term Memory Management**: Incorporates vector databases for efficient information retrieval.
7. **Document Ingestion**: Processes various document types for information extraction.
8. **Interactive Mode**: Allows real-time communication with the agent.
9. **Sentiment Analysis**: Evaluates the sentiment of generated responses.
10. **Output Filtering and Cleaning**: Ensures generated responses meet specific criteria.
11. **Asynchronous and Concurrent Execution**: Supports efficient parallelization of tasks.
12. **Planning and Reasoning**: Implements techniques like algorithm of thoughts for enhanced decision-making.
## Architecture
# `Agent` Structure Reference Documentation
```mermaid
graph TD
@ -41,137 +19,170 @@ graph TD
L -->|Proceeds to Final LLM Processing| I
```
The `Agent` class is the core component of the Swarm Agent framework. It serves as an autonomous agent that bridges Language Models (LLMs) with external tools and long-term memory systems. The class is designed to handle a variety of document types—including PDFs, text files, Markdown, and JSON—enabling robust document ingestion and processing. By integrating these capabilities, the `Agent` class empowers LLMs to perform complex tasks, utilize external resources, and manage information efficiently, making it a versatile solution for advanced autonomous workflows.
## Features
The `Agent` class establishes a conversational loop with a language model, allowing for interactive task execution, feedback collection, and dynamic response generation. It includes features such as:
| Feature | Description |
|------------------------------------------|--------------------------------------------------------------------------------------------------|
| **Conversational Loop** | Enables back-and-forth interaction with the model. |
| **Feedback Collection** | Allows users to provide feedback on generated responses. |
| **Stoppable Conversation** | Supports custom stopping conditions for the conversation. |
| **Retry Mechanism** | Implements a retry system for handling issues in response generation. |
| **Tool Integration** | Supports the integration of various tools for enhanced capabilities. |
| **Long-term Memory Management** | Incorporates vector databases for efficient information retrieval. |
| **Document Ingestion** | Processes various document types for information extraction. |
| **Interactive Mode** | Allows real-time communication with the agent. |
| **Sentiment Analysis** | Evaluates the sentiment of generated responses. |
| **Output Filtering and Cleaning** | Ensures generated responses meet specific criteria. |
| **Asynchronous and Concurrent Execution**| Supports efficient parallelization of tasks. |
| **Planning and Reasoning** | Implements techniques like algorithm of thoughts for enhanced decision-making. |
## `Agent` Attributes
| Attribute | Description |
|-----------|-------------|
| `id` | Unique identifier for the agent instance. |
| `llm` | Language model instance used by the agent. |
| `template` | Template used for formatting responses. |
| `max_loops` | Maximum number of loops the agent can run. |
| `stopping_condition` | Callable function determining when to stop looping. |
| `loop_interval` | Interval (in seconds) between loops. |
| `retry_attempts` | Number of retry attempts for failed LLM calls. |
| `retry_interval` | Interval (in seconds) between retry attempts. |
| `return_history` | Boolean indicating whether to return conversation history. |
| `stopping_token` | Token that stops the agent from looping when present in the response. |
| `dynamic_loops` | Boolean indicating whether to dynamically determine the number of loops. |
| `interactive` | Boolean indicating whether to run in interactive mode. |
| `dashboard` | Boolean indicating whether to display a dashboard. |
| `agent_name` | Name of the agent instance. |
| `agent_description` | Description of the agent instance. |
| `system_prompt` | System prompt used to initialize the conversation. |
| `tools` | List of callable functions representing tools the agent can use. |
| `dynamic_temperature_enabled` | Boolean indicating whether to dynamically adjust the LLM's temperature. |
| `sop` | Standard operating procedure for the agent. |
| `sop_list` | List of strings representing the standard operating procedure. |
| `saved_state_path` | File path for saving and loading the agent's state. |
| `autosave` | Boolean indicating whether to automatically save the agent's state. |
| `context_length` | Maximum length of the context window (in tokens) for the LLM. |
| `user_name` | Name used to represent the user in the conversation. |
| `self_healing_enabled` | Boolean indicating whether to attempt self-healing in case of errors. |
| `code_interpreter` | Boolean indicating whether to interpret and execute code snippets. |
| `multi_modal` | Boolean indicating whether to support multimodal inputs. |
| `pdf_path` | File path of a PDF document to be ingested. |
| `list_of_pdf` | List of file paths for PDF documents to be ingested. |
| `tokenizer` | Instance of a tokenizer used for token counting and management. |
| `long_term_memory` | Instance of a `BaseVectorDatabase` implementation for long-term memory management. |
| `preset_stopping_token` | Boolean indicating whether to use a preset stopping token. |
| `traceback` | Object used for traceback handling. |
| `traceback_handlers` | List of traceback handlers. |
| `streaming_on` | Boolean indicating whether to stream responses. |
| `docs` | List of document paths or contents to be ingested. |
| `docs_folder` | Path to a folder containing documents to be ingested. |
| `verbose` | Boolean indicating whether to print verbose output. |
| `parser` | Callable function used for parsing input data. |
| `best_of_n` | Integer indicating the number of best responses to generate. |
| `callback` | Callable function to be called after each agent loop. |
| `metadata` | Dictionary containing metadata for the agent. |
| `callbacks` | List of callable functions to be called during execution. |
| `search_algorithm` | Callable function for long-term memory retrieval. |
| `logs_to_filename` | File path for logging agent activities. |
| `evaluator` | Callable function for evaluating the agent's responses. |
| `stopping_func` | Callable function used as a stopping condition. |
| `custom_loop_condition` | Callable function used as a custom loop condition. |
| `sentiment_threshold` | Float value representing the sentiment threshold for evaluating responses. |
| `custom_exit_command` | String representing a custom command for exiting the agent's loop. |
| `sentiment_analyzer` | Callable function for sentiment analysis on outputs. |
| `limit_tokens_from_string` | Callable function for limiting the number of tokens in a string. |
| `custom_tools_prompt` | Callable function for generating a custom prompt for tool usage. |
| `tool_schema` | Data structure representing the schema for the agent's tools. |
| `output_type` | Type representing the expected output type of responses. |
| `function_calling_type` | String representing the type of function calling. |
| `output_cleaner` | Callable function for cleaning the agent's output. |
| `function_calling_format_type` | String representing the format type for function calling. |
| `list_base_models` | List of base models used for generating tool schemas. |
| `metadata_output_type` | String representing the output type for metadata. |
| `state_save_file_type` | String representing the file type for saving the agent's state. |
| `chain_of_thoughts` | Boolean indicating whether to use the chain of thoughts technique. |
| `algorithm_of_thoughts` | Boolean indicating whether to use the algorithm of thoughts technique. |
| `tree_of_thoughts` | Boolean indicating whether to use the tree of thoughts technique. |
| `tool_choice` | String representing the method for tool selection. |
| `execute_tool` | Boolean indicating whether to execute tools. |
| `rules` | String representing the rules for the agent's behavior. |
| `planning` | Boolean indicating whether to perform planning. |
| `planning_prompt` | String representing the prompt for planning. |
| `device` | String representing the device on which the agent should run. |
| `custom_planning_prompt` | String representing a custom prompt for planning. |
| `memory_chunk_size` | Integer representing the maximum size of memory chunks for long-term memory retrieval. |
| `agent_ops_on` | Boolean indicating whether agent operations should be enabled. |
| `return_step_meta` | Boolean indicating whether to return JSON of all steps and additional metadata. |
| `time_created` | Float representing the time the agent was created. |
| `tags` | Optional list of strings for tagging the agent. |
| `use_cases` | Optional list of dictionaries describing use cases for the agent. |
| `step_pool` | List of Step objects representing the agent's execution steps. |
| `print_every_step` | Boolean indicating whether to print every step of execution. |
| `agent_output` | ManySteps object containing the agent's output and metadata. |
| `data_memory` | Optional callable for data memory operations. |
| `load_yaml_path` | String representing the path to a YAML file for loading configurations. |
| `auto_generate_prompt` | Boolean indicating whether to automatically generate prompts. |
| `rag_every_loop` | Boolean indicating whether to query RAG database for context on every loop |
| `plan_enabled` | Boolean indicating whether planning functionality is enabled |
| `artifacts_on` | Boolean indicating whether to save artifacts from agent execution |
| `artifacts_output_path` | File path where artifacts should be saved |
| `artifacts_file_extension` | File extension to use for saved artifacts |
| `all_cores` | Boolean indicating whether to use all CPU cores |
| `device_id` | ID of the GPU device to use if running on GPU |
| `scheduled_run_date` | Optional datetime for scheduling future agent runs |
| `do_not_use_cluster_ops` | Boolean indicating whether to avoid cluster operations |
| `all_gpus` | Boolean indicating whether to use all available GPUs |
| `model_name` | String representing the name of the model to use |
| `llm_args` | Dictionary containing additional arguments for the LLM |
| `load_state_path` | String representing the path to load state from |
| `role` | String representing the role of the agent (e.g., "worker") |
| `print_on` | Boolean indicating whether to print output |
| `tools_list_dictionary` | List of dictionaries representing tool schemas |
| `mcp_url` | String or MCPConnection representing the MCP server URL |
| `mcp_urls` | List of strings representing multiple MCP server URLs |
| `react_on` | Boolean indicating whether to enable ReAct reasoning |
| `safety_prompt_on` | Boolean indicating whether to enable safety prompts |
| `random_models_on` | Boolean indicating whether to randomly select models |
| `mcp_config` | MCPConnection object containing MCP configuration |
| `top_p` | Float representing the top-p sampling parameter |
| `conversation_schema` | ConversationSchema object for conversation formatting |
| `llm_base_url` | String representing the base URL for the LLM API |
| `llm_api_key` | String representing the API key for the LLM |
| `rag_config` | RAGConfig object containing RAG configuration |
| `tool_call_summary` | Boolean indicating whether to summarize tool calls |
| `output_raw_json_from_tool_call` | Boolean indicating whether to output raw JSON from tool calls |
| `summarize_multiple_images` | Boolean indicating whether to summarize multiple image outputs |
| `tool_retry_attempts` | Integer representing the number of retry attempts for tool execution |
| `reasoning_prompt_on` | Boolean indicating whether to enable reasoning prompts |
| `dynamic_context_window` | Boolean indicating whether to dynamically adjust context window |
| `created_at` | Float representing the timestamp when the agent was created |
| `workspace_dir` | String representing the workspace directory for the agent |
| `timeout` | Integer representing the timeout for operations in seconds |
| Attribute | Type | Description |
|-----------|------|-------------|
| `id` | `Optional[str]` | Unique identifier for the agent instance. |
| `llm` | `Optional[Any]` | Language model instance used by the agent. |
| `template` | `Optional[str]` | Template used for formatting responses. |
| `max_loops` | `Optional[Union[int, str]]` | Maximum number of loops the agent can run. |
| `stopping_condition` | `Optional[Callable[[str], bool]]` | Callable function determining when to stop looping. |
| `loop_interval` | `Optional[int]` | Interval (in seconds) between loops. |
| `retry_attempts` | `Optional[int]` | Number of retry attempts for failed LLM calls. |
| `retry_interval` | `Optional[int]` | Interval (in seconds) between retry attempts. |
| `return_history` | `Optional[bool]` | Boolean indicating whether to return conversation history. |
| `stopping_token` | `Optional[str]` | Token that stops the agent from looping when present in the response. |
| `dynamic_loops` | `Optional[bool]` | Boolean indicating whether to dynamically determine the number of loops. |
| `interactive` | `Optional[bool]` | Boolean indicating whether to run in interactive mode. |
| `dashboard` | `Optional[bool]` | Boolean indicating whether to display a dashboard. |
| `agent_name` | `Optional[str]` | Name of the agent instance. |
| `agent_description` | `Optional[str]` | Description of the agent instance. |
| `system_prompt` | `Optional[str]` | System prompt used to initialize the conversation. |
| `tools` | `List[Callable]` | List of callable functions representing tools the agent can use. |
| `dynamic_temperature_enabled` | `Optional[bool]` | Boolean indicating whether to dynamically adjust the LLM's temperature. |
| `sop` | `Optional[str]` | Standard operating procedure for the agent. |
| `sop_list` | `Optional[List[str]]` | List of strings representing the standard operating procedure. |
| `saved_state_path` | `Optional[str]` | File path for saving and loading the agent's state. |
| `autosave` | `Optional[bool]` | Boolean indicating whether to automatically save the agent's state. |
| `context_length` | `Optional[int]` | Maximum length of the context window (in tokens) for the LLM. |
| `user_name` | `Optional[str]` | Name used to represent the user in the conversation. |
| `self_healing_enabled` | `Optional[bool]` | Boolean indicating whether to attempt self-healing in case of errors. |
| `code_interpreter` | `Optional[bool]` | Boolean indicating whether to interpret and execute code snippets. |
| `multi_modal` | `Optional[bool]` | Boolean indicating whether to support multimodal inputs. |
| `pdf_path` | `Optional[str]` | File path of a PDF document to be ingested. |
| `list_of_pdf` | `Optional[str]` | List of file paths for PDF documents to be ingested. |
| `tokenizer` | `Optional[Any]` | Instance of a tokenizer used for token counting and management. |
| `long_term_memory` | `Optional[Union[Callable, Any]]` | Instance of a `BaseVectorDatabase` implementation for long-term memory management. |
| `preset_stopping_token` | `Optional[bool]` | Boolean indicating whether to use a preset stopping token. |
| `traceback` | `Optional[Any]` | Object used for traceback handling. |
| `traceback_handlers` | `Optional[Any]` | List of traceback handlers. |
| `streaming_on` | `Optional[bool]` | Boolean indicating whether to stream responses. |
| `docs` | `List[str]` | List of document paths or contents to be ingested. |
| `docs_folder` | `Optional[str]` | Path to a folder containing documents to be ingested. |
| `verbose` | `Optional[bool]` | Boolean indicating whether to print verbose output. |
| `parser` | `Optional[Callable]` | Callable function used for parsing input data. |
| `best_of_n` | `Optional[int]` | Integer indicating the number of best responses to generate. |
| `callback` | `Optional[Callable]` | Callable function to be called after each agent loop. |
| `metadata` | `Optional[Dict[str, Any]]` | Dictionary containing metadata for the agent. |
| `callbacks` | `Optional[List[Callable]]` | List of callable functions to be called during execution. |
| `search_algorithm` | `Optional[Callable]` | Callable function for long-term memory retrieval. |
| `logs_to_filename` | `Optional[str]` | File path for logging agent activities. |
| `evaluator` | `Optional[Callable]` | Callable function for evaluating the agent's responses. |
| `stopping_func` | `Optional[Callable]` | Callable function used as a stopping condition. |
| `custom_loop_condition` | `Optional[Callable]` | Callable function used as a custom loop condition. |
| `sentiment_threshold` | `Optional[float]` | Float value representing the sentiment threshold for evaluating responses. |
| `custom_exit_command` | `Optional[str]` | String representing a custom command for exiting the agent's loop. |
| `sentiment_analyzer` | `Optional[Callable]` | Callable function for sentiment analysis on outputs. |
| `limit_tokens_from_string` | `Optional[Callable]` | Callable function for limiting the number of tokens in a string. |
| `custom_tools_prompt` | `Optional[Callable]` | Callable function for generating a custom prompt for tool usage. |
| `tool_schema` | `ToolUsageType` | Data structure representing the schema for the agent's tools. |
| `output_type` | `OutputType` | Type representing the expected output type of responses. |
| `function_calling_type` | `str` | String representing the type of function calling. |
| `output_cleaner` | `Optional[Callable]` | Callable function for cleaning the agent's output. |
| `function_calling_format_type` | `Optional[str]` | String representing the format type for function calling. |
| `list_base_models` | `Optional[List[BaseModel]]` | List of base models used for generating tool schemas. |
| `metadata_output_type` | `str` | String representing the output type for metadata. |
| `state_save_file_type` | `str` | String representing the file type for saving the agent's state. |
| `chain_of_thoughts` | `bool` | Boolean indicating whether to use the chain of thoughts technique. |
| `algorithm_of_thoughts` | `bool` | Boolean indicating whether to use the algorithm of thoughts technique. |
| `tree_of_thoughts` | `bool` | Boolean indicating whether to use the tree of thoughts technique. |
| `tool_choice` | `str` | String representing the method for tool selection. |
| `execute_tool` | `bool` | Boolean indicating whether to execute tools. |
| `rules` | `str` | String representing the rules for the agent's behavior. |
| `planning` | `Optional[str]` | Boolean indicating whether to perform planning. |
| `planning_prompt` | `Optional[str]` | String representing the prompt for planning. |
| `device` | `str` | String representing the device on which the agent should run. |
| `custom_planning_prompt` | `str` | String representing a custom prompt for planning. |
| `memory_chunk_size` | `int` | Integer representing the maximum size of memory chunks for long-term memory retrieval. |
| `agent_ops_on` | `bool` | Boolean indicating whether agent operations should be enabled. |
| `return_step_meta` | `Optional[bool]` | Boolean indicating whether to return JSON of all steps and additional metadata. |
| `time_created` | `Optional[str]` | Float representing the time the agent was created. |
| `tags` | `Optional[List[str]]` | Optional list of strings for tagging the agent. |
| `use_cases` | `Optional[List[Dict[str, str]]]` | Optional list of dictionaries describing use cases for the agent. |
| `step_pool` | `List[Step]` | List of Step objects representing the agent's execution steps. |
| `print_every_step` | `Optional[bool]` | Boolean indicating whether to print every step of execution. |
| `agent_output` | `ManySteps` | ManySteps object containing the agent's output and metadata. |
| `data_memory` | `Optional[Callable]` | Optional callable for data memory operations. |
| `load_yaml_path` | `str` | String representing the path to a YAML file for loading configurations. |
| `auto_generate_prompt` | `bool` | Boolean indicating whether to automatically generate prompts. |
| `rag_every_loop` | `bool` | Boolean indicating whether to query RAG database for context on every loop |
| `plan_enabled` | `bool` | Boolean indicating whether planning functionality is enabled |
| `artifacts_on` | `bool` | Boolean indicating whether to save artifacts from agent execution |
| `artifacts_output_path` | `str` | File path where artifacts should be saved |
| `artifacts_file_extension` | `str` | File extension to use for saved artifacts |
| `all_cores` | `bool` | Boolean indicating whether to use all CPU cores |
| `device_id` | `int` | ID of the GPU device to use if running on GPU |
| `scheduled_run_date` | `Optional[datetime]` | Optional datetime for scheduling future agent runs |
| `do_not_use_cluster_ops` | `bool` | Boolean indicating whether to avoid cluster operations |
| `all_gpus` | `bool` | Boolean indicating whether to use all available GPUs |
| `model_name` | `str` | String representing the name of the model to use |
| `llm_args` | `dict` | Dictionary containing additional arguments for the LLM |
| `load_state_path` | `str` | String representing the path to load state from |
| `role` | `agent_roles` | String representing the role of the agent (e.g., "worker") |
| `print_on` | `bool` | Boolean indicating whether to print output |
| `tools_list_dictionary` | `Optional[List[Dict[str, Any]]]` | List of dictionaries representing tool schemas |
| `mcp_url` | `Optional[Union[str, MCPConnection]]` | String or MCPConnection representing the MCP server URL |
| `mcp_urls` | `List[str]` | List of strings representing multiple MCP server URLs |
| `react_on` | `bool` | Boolean indicating whether to enable ReAct reasoning |
| `safety_prompt_on` | `bool` | Boolean indicating whether to enable safety prompts |
| `random_models_on` | `bool` | Boolean indicating whether to randomly select models |
| `mcp_config` | `Optional[MCPConnection]` | MCPConnection object containing MCP configuration |
| `top_p` | `Optional[float]` | Float representing the top-p sampling parameter |
| `conversation_schema` | `Optional[ConversationSchema]` | ConversationSchema object for conversation formatting |
| `llm_base_url` | `Optional[str]` | String representing the base URL for the LLM API |
| `llm_api_key` | `Optional[str]` | String representing the API key for the LLM |
| `tool_call_summary` | `bool` | Boolean indicating whether to summarize tool calls |
| `output_raw_json_from_tool_call` | `bool` | Boolean indicating whether to output raw JSON from tool calls |
| `summarize_multiple_images` | `bool` | Boolean indicating whether to summarize multiple image outputs |
| `tool_retry_attempts` | `int` | Integer representing the number of retry attempts for tool execution |
| `reasoning_prompt_on` | `bool` | Boolean indicating whether to enable reasoning prompts |
| `dynamic_context_window` | `bool` | Boolean indicating whether to dynamically adjust context window |
| `show_tool_execution_output` | `bool` | Boolean indicating whether to show tool execution output |
| `created_at` | `float` | Float representing the timestamp when the agent was created |
| `workspace_dir` | `str` | String representing the workspace directory for the agent |
| `timeout` | `Optional[int]` | Integer representing the timeout for operations in seconds |
| `temperature` | `float` | Float representing the temperature for the LLM |
| `max_tokens` | `int` | Integer representing the maximum number of tokens |
| `frequency_penalty` | `float` | Float representing the frequency penalty |
| `presence_penalty` | `float` | Float representing the presence penalty |
| `tool_system_prompt` | `str` | String representing the system prompt for tools |
| `log_directory` | `str` | String representing the directory for logs |
## `Agent` Methods
| Method | Description | Inputs | Usage Example |
|--------|-------------|--------|----------------|
| `run(task, img=None, imgs=None, correct_answer=None, streaming_callback=None, *args, **kwargs)` | Runs the autonomous agent loop to complete the given task. | `task` (str): The task to be performed.<br>`img` (str, optional): Path to an image file.<br>`imgs` (List[str], optional): List of image paths.<br>`correct_answer` (str, optional): Expected correct answer for validation.<br>`streaming_callback` (Callable, optional): Callback for streaming tokens.<br>`*args`, `**kwargs`: Additional arguments. | `response = agent.run("Generate a report on financial performance.")` |
| `run(task, img=None, imgs=None, correct_answer=None, streaming_callback=None, *args, **kwargs)` | Runs the autonomous agent loop to complete the given task with enhanced parameters. | `task` (str): The task to be performed.<br>`img` (str, optional): Path to a single image file.<br>`imgs` (List[str], optional): List of image paths for batch processing.<br>`correct_answer` (str, optional): Expected correct answer for validation with automatic retries.<br>`streaming_callback` (Callable, optional): Callback function for real-time token streaming.<br>`*args`, `**kwargs`: Additional arguments. | `response = agent.run("Generate a report on financial performance.")` |
| `run_batched(tasks, imgs=None, *args, **kwargs)` | Runs multiple tasks concurrently in batch mode. | `tasks` (List[str]): List of tasks to run.<br>`imgs` (List[str], optional): List of images to process.<br>`*args`, `**kwargs`: Additional arguments. | `responses = agent.run_batched(["Task 1", "Task 2"])` |
| `run_multiple_images(task, imgs, *args, **kwargs)` | Runs the agent with multiple images using concurrent processing. | `task` (str): The task to perform on each image.<br>`imgs` (List[str]): List of image paths or URLs.<br>`*args`, `**kwargs`: Additional arguments. | `outputs = agent.run_multiple_images("Describe image", ["img1.jpg", "img2.png"])` |
| `continuous_run_with_answer(task, img=None, correct_answer=None, max_attempts=10)` | Runs the agent until the correct answer is provided. | `task` (str): The task to perform.<br>`img` (str, optional): Image to process.<br>`correct_answer` (str): Expected answer.<br>`max_attempts` (int): Maximum attempts. | `response = agent.continuous_run_with_answer("Math problem", correct_answer="42")` |
| `tool_execution_retry(response, loop_count)` | Executes tools with retry logic for handling failures. | `response` (any): Response containing tool calls.<br>`loop_count` (int): Current loop number. | `agent.tool_execution_retry(response, 1)` |
| `__call__(task, img=None, *args, **kwargs)` | Alternative way to call the `run` method. | Same as `run`. | `response = agent("Generate a report on financial performance.")` |
| `parse_and_execute_tools(response, *args, **kwargs)` | Parses the agent's response and executes any tools mentioned in it. | `response` (str): The agent's response to be parsed.<br>`*args`, `**kwargs`: Additional arguments. | `agent.parse_and_execute_tools(response)` |
| `add_memory(message)` | Adds a message to the agent's memory. | `message` (str): The message to add. | `agent.add_memory("Important information")` |
@ -249,70 +260,118 @@ graph TD
## Updated Run Method
## `Agent.run(*args, **kwargs)`
The run method has been updated with new parameters for enhanced functionality:
The `run` method has been significantly enhanced with new parameters for advanced functionality:
| Method | Description | Inputs | Usage Example |
|--------|-------------|--------|----------------|
| `run(task, img=None, imgs=None, correct_answer=None, streaming_callback=None, *args, **kwargs)` | Runs the agent with enhanced parameters | `task` (str): Task to run<br>`img` (str, optional): Single image path<br>`imgs` (List[str], optional): List of image paths<br>`correct_answer` (str, optional): Expected answer for validation<br>`streaming_callback` (Callable, optional): Callback for streaming tokens<br>`*args`, `**kwargs`: Additional arguments | `agent.run("Analyze data", imgs=["img1.jpg", "img2.png"])` |
### Method Signature
```python
def run(
self,
task: Optional[Union[str, Any]] = None,
img: Optional[str] = None,
imgs: Optional[List[str]] = None,
correct_answer: Optional[str] = None,
streaming_callback: Optional[Callable[[str], None]] = None,
*args,
**kwargs,
) -> Any:
```
### Parameters
| Parameter | Type | Description | Default |
|-----------|------|-------------|---------|
| `task` | `Optional[Union[str, Any]]` | The task to be executed | `None` |
| `img` | `Optional[str]` | Path to a single image file | `None` |
| `imgs` | `Optional[List[str]]` | List of image paths for batch processing | `None` |
| `correct_answer` | `Optional[str]` | Expected correct answer for validation with automatic retries | `None` |
| `streaming_callback` | `Optional[Callable[[str], None]]` | Callback function to receive streaming tokens in real-time | `None` |
| `*args` | `Any` | Additional positional arguments | - |
| `**kwargs` | `Any` | Additional keyword arguments | - |
## Getting Started
### Examples
To use the Swarm Agent, first install the required dependencies:
```bash
pip3 install -U swarms
```
```python
# --- Enhanced Run Method Examples ---
Then, you can initialize and use the agent as follows:
# Basic Usage
# Simple task execution
response = agent.run("Generate a report on financial performance.")
```python
from swarms.structs.agent import Agent
from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT
# Single Image Processing
# Process a single image
response = agent.run(
task="Analyze this image and describe what you see",
img="path/to/image.jpg"
)
# Initialize the Financial Analysis Agent with GPT-4o-mini model
agent = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
model_name="gpt-4o-mini",
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="finance_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
output_type="str",
# Multiple Image Processing
# Process multiple images concurrently
response = agent.run(
task="Analyze these images and identify common patterns",
imgs=["image1.jpg", "image2.png", "image3.jpeg"]
)
# Run the agent
# Answer Validation with Retries
# Run until correct answer is found
response = agent.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?"
task="What is the capital of France?",
correct_answer="Paris"
)
# Real-time Streaming
def streaming_callback(token: str):
print(token, end="", flush=True)
response = agent.run(
task="Tell me a long story about space exploration",
streaming_callback=streaming_callback
)
print(response)
# Combined Parameters
# Complex task with multiple features
response = agent.run(
task="Analyze these financial charts and provide insights",
imgs=["chart1.png", "chart2.png", "chart3.png"],
correct_answer="market volatility",
streaming_callback=my_callback
)
```
## Advanced Usage
### Return Types
The `run` method returns different types based on the input parameters:
| Scenario | Return Type | Description |
|-----------------------|-----------------------------------------------|---------------------------------------------------------|
| Single task | `str` | Returns the agent's response |
| Multiple images | `List[Any]` | Returns a list of results, one for each image |
| Answer validation | `str` | Returns the correct answer as a string |
| Streaming | `str` | Returns the complete response after streaming completes |
## Advanced Capabilities
### Tool Integration
To integrate tools with the Swarm `Agent`, you can pass a list of callable functions with types and doc strings to the `tools` parameter when initializing the `Agent` instance. The agent will automatically convert these functions into an OpenAI function calling schema and make them available for use during task execution.
The `Agent` class allows seamless integration of external tools by accepting a list of Python functions via the `tools` parameter during initialization. Each tool function must include type annotations and a docstring. The `Agent` class automatically converts these functions into an OpenAI-compatible function calling schema, making them accessible for use during task execution.
Learn more about tools [here](https://docs.swarms.world/en/latest/swarms/tools/tools_examples/)
## Requirements for a tool
- Function
- With types
- with doc strings
| Requirement | Description |
|---------------------|------------------------------------------------------------------|
| Function | The tool must be a Python function. |
| With types | The function must have type annotations for its parameters. |
| With doc strings | The function must include a docstring describing its behavior. |
| Must return a string| The function must return a string value. |
```python
from swarms import Agent
from swarm_models import OpenAIChat
import subprocess
def terminal(code: str):
@ -331,7 +390,7 @@ def terminal(code: str):
# Initialize the agent with a tool
agent = Agent(
agent_name="Terminal-Agent",
llm=OpenAIChat(api_key=os.getenv("OPENAI_API_KEY")),
model_name="claude-sonnet-4-20250514",
tools=[terminal],
system_prompt="You are an agent that can execute terminal commands. Use the tools provided to assist the user.",
)
@ -347,7 +406,6 @@ The Swarm Agent supports integration with vector databases for long-term memory
```python
from swarms import Agent
from swarm_models import Anthropic
from swarms_memory import ChromaDB
# Initialize ChromaDB
@ -359,7 +417,7 @@ chromadb = ChromaDB(
# Initialize the agent with long-term memory
agent = Agent(
agent_name="Financial-Analysis-Agent",
llm=Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY")),
model_name="claude-sonnet-4-20250514",
long_term_memory=chromadb,
system_prompt="You are a financial analysis agent with access to long-term memory.",
)
@ -376,7 +434,7 @@ To enable interactive mode, set the `interactive` parameter to `True` when initi
```python
agent = Agent(
agent_name="Interactive-Agent",
llm=OpenAIChat(api_key=os.getenv("OPENAI_API_KEY")),
model_name="claude-sonnet-4-20250514",
interactive=True,
system_prompt="You are an interactive agent. Engage in a conversation with the user.",
)
@ -385,31 +443,6 @@ agent = Agent(
agent.run("Let's start a conversation")
```
### Sentiment Analysis
To perform sentiment analysis on the agent's outputs, you can provide a sentiment analyzer function:
```python
from textblob import TextBlob
def sentiment_analyzer(text):
analysis = TextBlob(text)
return analysis.sentiment.polarity
agent = Agent(
agent_name="Sentiment-Analysis-Agent",
llm=OpenAIChat(api_key=os.getenv("OPENAI_API_KEY")),
sentiment_analyzer=sentiment_analyzer,
sentiment_threshold=0.5,
system_prompt="You are an agent that generates responses with sentiment analysis.",
)
response = agent.run("Generate a positive statement about AI")
print(response)
```
### Undo Functionality
```python
@ -468,6 +501,29 @@ print(f"Completed {len(batch_responses)} tasks in batch mode")
The new `run_batched` method allows you to process multiple tasks efficiently:
#### Method Signature
```python
def run_batched(
self,
tasks: List[str],
imgs: List[str] = None,
*args,
**kwargs,
) -> List[Any]:
```
#### Parameters
| Parameter | Type | Description | Default |
|-----------|------|-------------|---------|
| `tasks` | `List[str]` | List of tasks to run concurrently | Required |
| `imgs` | `List[str]` | List of images to process with each task | `None` |
| `*args` | `Any` | Additional positional arguments | - |
| `**kwargs` | `Any` | Additional keyword arguments | - |
#### Usage Examples
```python
# Process multiple tasks in batch
tasks = [
@ -485,6 +541,27 @@ for i, (task, result) in enumerate(zip(tasks, batch_results)):
print(f"Result: {result}\n")
```
#### Batch Processing with Images
```python
# Process multiple tasks with multiple images
tasks = [
"Analyze this chart for trends",
"Identify patterns in this data visualization",
"Summarize the key insights from this graph"
]
images = ["chart1.png", "chart2.png", "chart3.png"]
# Each task will process all images
batch_results = agent.run_batched(tasks, imgs=images)
```
#### Return Type
- **Returns**: `List[Any]` - List of results from each task execution
- **Order**: Results are returned in the same order as the input tasks
### Various other settings
```python
@ -540,35 +617,27 @@ agent.model_dump_json()
print(agent.to_toml())
```
## Auto Generate Prompt + CPU Execution
## Examples
### Auto Generate Prompt + CPU Execution
```python
import os
from swarms import Agent
from swarm_models import OpenAIChat
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Retrieve the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Initialize the model for OpenAI Chat
model = OpenAIChat(
openai_api_base="https://api.groq.com/openai/v1",
openai_api_key=api_key,
model_name="llama-3.1-70b-versatile",
temperature=0.1,
)
# Initialize the agent with automated prompt engineering enabled
agent = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt=None, # System prompt is dynamically generated
model_name="gpt-4.1",
agent_description=None,
llm=model,
max_loops=1,
@ -587,22 +656,23 @@ agent = Agent(
# Run the agent with a task description and specify the device
agent.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria",
## Will design a system prompt based on the task if description and system prompt are None
device="cpu",
)
# Print the dynamically generated system prompt
print(agent.system_prompt)
```
## Agent Structured Outputs
- Create a structured output schema for the agent [List[Dict]]
- Input in the `tools_list_dictionary` parameter
- Output is a dictionary
- Use the `str_to_dict` function to convert the output to a dictionary
```python
from dotenv import load_dotenv
@ -673,6 +743,170 @@ print(type(str_to_dict(out)))
```
## Comprehensive Agent Configuration Examples
### Advanced Agent with All New Features
```python
from swarms import Agent
from swarms_memory import ChromaDB
from datetime import datetime, timedelta
# Initialize advanced agent with comprehensive configuration
agent = Agent(
# Basic Configuration
agent_name="Advanced-Analysis-Agent",
agent_description="Multi-modal analysis agent with advanced capabilities",
system_prompt="You are an advanced analysis agent capable of processing multiple data types.",
# Enhanced Run Parameters
max_loops=3,
dynamic_loops=True,
interactive=False,
dashboard=True,
# Device and Resource Management
device="gpu",
device_id=0,
all_cores=True,
all_gpus=False,
do_not_use_cluster_ops=True,
# Memory and Context Management
context_length=100000,
memory_chunk_size=3000,
dynamic_context_window=True,
rag_every_loop=True,
# Advanced Features
auto_generate_prompt=True,
plan_enabled=True,
react_on=True,
safety_prompt_on=True,
reasoning_prompt_on=True,
# Tool Management
tool_retry_attempts=5,
tool_call_summary=True,
show_tool_execution_output=True,
function_calling_format_type="OpenAI",
# Artifacts and Output
artifacts_on=True,
artifacts_output_path="./outputs",
artifacts_file_extension=".md",
output_type="json",
# LLM Configuration
model_name="gpt-4o",
temperature=0.3,
max_tokens=8000,
top_p=0.95,
frequency_penalty=0.1,
presence_penalty=0.1,
# MCP Integration
mcp_url="http://localhost:8000",
mcp_config=None,
# Performance Settings
timeout=300,
retry_attempts=3,
retry_interval=2,
# Scheduling
scheduled_run_date=datetime.now() + timedelta(hours=1),
# Metadata and Organization
tags=["analysis", "multi-modal", "advanced"],
use_cases=[{"name": "Data Analysis", "description": "Process and analyze complex datasets"}],
# Verbosity and Logging
verbose=True,
print_on=True,
print_every_step=True,
log_directory="./logs"
)
# Run with multiple images and streaming
def streaming_callback(token: str):
print(token, end="", flush=True)
response = agent.run(
task="Analyze these financial charts and provide comprehensive insights",
imgs=["chart1.png", "chart2.png", "chart3.png"],
streaming_callback=streaming_callback
)
# Run batch processing
tasks = [
"Analyze Q1 financial performance",
"Generate Q2 projections",
"Create executive summary"
]
batch_results = agent.run_batched(tasks)
# Run with answer validation
validated_response = agent.run(
task="What is the current market trend?",
correct_answer="bullish",
max_attempts=5
)
```
### MCP-Enabled Agent Example
```python
from swarms import Agent
from swarms.schemas.mcp_schemas import MCPConnection
# Configure MCP connection
mcp_config = MCPConnection(
server_path="http://localhost:8000",
server_name="my_mcp_server",
capabilities=["tools", "filesystem"]
)
# Initialize agent with MCP integration
mcp_agent = Agent(
agent_name="MCP-Enabled-Agent",
system_prompt="You are an agent with access to external tools via MCP.",
mcp_config=mcp_config,
mcp_urls=["http://localhost:8000", "http://localhost:8001"],
tool_call_summary=True,
output_raw_json_from_tool_call=True
)
# Run with MCP tools
response = mcp_agent.run("Use the available tools to analyze the current system status")
```
### Multi-Image Processing Agent
```python
# Initialize agent optimized for image processing
image_agent = Agent(
agent_name="Image-Analysis-Agent",
system_prompt="You are an expert at analyzing images and extracting insights.",
multi_modal=True,
summarize_multiple_images=True,
artifacts_on=True,
artifacts_output_path="./image_analysis",
artifacts_file_extension=".txt"
)
# Process multiple images with summarization
images = ["product1.jpg", "product2.jpg", "product3.jpg"]
analysis = image_agent.run(
task="Analyze these product images and identify design patterns",
imgs=images
)
# The agent will automatically summarize results if summarize_multiple_images=True
print(f"Analysis complete: {len(analysis)} images processed")
```
## New Features and Parameters
### Enhanced Run Method Parameters
@ -685,43 +919,100 @@ The `run` method now supports several new parameters for advanced functionality:
### MCP (Model Context Protocol) Integration
New parameters enable seamless MCP server integration:
- **`mcp_url`**: Connect to a single MCP server
- **`mcp_urls`**: Connect to multiple MCP servers
- **`mcp_config`**: Advanced MCP configuration options
| Parameter | Description |
|----------------|-----------------------------------------------------|
| `mcp_url` | Connect to a single MCP server |
| `mcp_urls` | Connect to multiple MCP servers |
| `mcp_config` | Advanced MCP configuration options |
### Advanced Reasoning and Safety
- **`react_on`**: Enable ReAct reasoning for complex problem-solving
- **`safety_prompt_on`**: Add safety constraints to agent responses
- **`reasoning_prompt_on`**: Enable multi-loop reasoning for complex tasks
| Parameter | Description |
|----------------------|--------------------------------------------------------------------|
| `react_on` | Enable ReAct reasoning for complex problem-solving |
| `safety_prompt_on` | Add safety constraints to agent responses |
| `reasoning_prompt_on`| Enable multi-loop reasoning for complex tasks |
### Performance and Resource Management
- **`dynamic_context_window`**: Automatically adjust context window based on available tokens
- **`tool_retry_attempts`**: Configure retry behavior for tool execution
- **`summarize_multiple_images`**: Automatically summarize results from multiple image processing
| Parameter | Description |
|--------------------------|--------------------------------------------------------------------------|
| `dynamic_context_window` | Automatically adjust context window based on available tokens |
| `tool_retry_attempts` | Configure retry behavior for tool execution |
| `summarize_multiple_images` | Automatically summarize results from multiple image processing |
### Device and Resource Management
| Parameter | Description |
|--------------------------|--------------------------------------------------------------------------|
| `device` | Specify CPU or GPU execution (`"cpu"` or `"gpu"`) |
| `device_id` | Specify which GPU device to use |
| `all_cores` | Enable use of all available CPU cores |
| `all_gpus` | Enable use of all available GPUs |
| `do_not_use_cluster_ops` | Control cluster operation usage |
### Advanced Memory and Context
| Parameter | Description |
|--------------------------|--------------------------------------------------------------------------|
| `rag_every_loop` | Query RAG database on every loop iteration |
| `memory_chunk_size` | Control memory chunk size for long-term memory |
| `auto_generate_prompt` | Automatically generate system prompts based on tasks |
| `plan_enabled` | Enable planning functionality for complex tasks |
### Artifacts and Output Management
| Parameter | Description |
|--------------------------|--------------------------------------------------------------------------|
| `artifacts_on` | Enable saving artifacts from agent execution |
| `artifacts_output_path` | Specify where to save artifacts |
| `artifacts_file_extension` | Control artifact file format |
| `output_raw_json_from_tool_call` | Control tool call output format |
### Enhanced Tool Management
| Parameter | Description |
|--------------------------|--------------------------------------------------------------------------|
| `tools_list_dictionary` | Provide tool schemas in dictionary format |
| `tool_call_summary` | Enable automatic summarization of tool calls |
| `show_tool_execution_output` | Control visibility of tool execution details |
| `function_calling_format_type` | Specify function calling format (OpenAI, etc.) |
### Advanced LLM Configuration
| Parameter | Description |
|--------------------------|--------------------------------------------------------------------------|
| `llm_args` | Pass additional arguments to the LLM |
| `llm_base_url` | Specify custom LLM API endpoint |
| `llm_api_key` | Provide LLM API key directly |
| `top_p` | Control top-p sampling parameter |
| `frequency_penalty` | Control frequency penalty |
| `presence_penalty` | Control presence penalty |
## Best Practices
1. Always provide a clear and concise `system_prompt` to guide the agent's behavior.
2. Use `tools` to extend the agent's capabilities for specific tasks.
3. Implement error handling and utilize the `retry_attempts` feature for robust execution.
4. Leverage `long_term_memory` for tasks that require persistent information.
5. Use `interactive` mode for real-time conversations and `dashboard` for monitoring.
6. Implement `sentiment_analysis` for applications requiring tone management.
7. Utilize `autosave` and `save`/`load` methods for continuity across sessions.
8. Optimize token usage with `dynamic_context_window` and `tokens_checks` methods.
9. Use `concurrent` and `async` methods for performance-critical applications.
10. Regularly review and analyze feedback using the `analyze_feedback` method.
11. Use `artifacts_on` to save important outputs from agent execution
12. Configure `device` and `device_id` appropriately for optimal performance
13. Enable `rag_every_loop` when continuous context from long-term memory is needed
14. Use `scheduled_run_date` for automated task scheduling
15. Leverage `run_batched` for efficient processing of multiple related tasks
16. Use `mcp_url` or `mcp_urls` to extend agent capabilities with external tools
17. Enable `react_on` for complex reasoning tasks requiring step-by-step analysis
18. Configure `tool_retry_attempts` for robust tool execution in production environments
| Best Practice / Feature | Description |
|---------------------------------------------------------|--------------------------------------------------------------------------------------------------|
| `system_prompt` | Always provide a clear and concise system prompt to guide the agent's behavior. |
| `tools` | Use tools to extend the agent's capabilities for specific tasks. |
| `retry_attempts` & error handling | Implement error handling and utilize the retry_attempts feature for robust execution. |
| `long_term_memory` | Leverage long_term_memory for tasks that require persistent information. |
| `interactive` & `dashboard` | Use interactive mode for real-time conversations and dashboard for monitoring. |
| `sentiment_analysis` | Implement sentiment_analysis for applications requiring tone management. |
| `autosave`, `save`/`load` | Utilize autosave and save/load methods for continuity across sessions. |
| `dynamic_context_window` & `tokens_checks` | Optimize token usage with dynamic_context_window and tokens_checks methods. |
| `concurrent` & `async` methods | Use concurrent and async methods for performance-critical applications. |
| `analyze_feedback` | Regularly review and analyze feedback using the analyze_feedback method. |
| `artifacts_on` | Use artifacts_on to save important outputs from agent execution. |
| `device` & `device_id` | Configure device and device_id appropriately for optimal performance. |
| `rag_every_loop` | Enable rag_every_loop when continuous context from long-term memory is needed. |
| `scheduled_run_date` | Use scheduled_run_date for automated task scheduling. |
| `run_batched` | Leverage run_batched for efficient processing of multiple related tasks. |
| `mcp_url` or `mcp_urls` | Use mcp_url or mcp_urls to extend agent capabilities with external tools. |
| `react_on` | Enable react_on for complex reasoning tasks requiring step-by-step analysis. |
| `tool_retry_attempts` | Configure tool_retry_attempts for robust tool execution in production environments. |
By following these guidelines and leveraging the Swarm Agent's extensive features, you can create powerful, flexible, and efficient autonomous agents for a wide range of applications.

@ -0,0 +1,230 @@
# Qdrant RAG Example with Document Ingestion
This example demonstrates how to use the agent structure from `example.py` with Qdrant RAG to ingest a vast array of PDF documents and text files for advanced quantitative trading analysis.
## 🚀 Features
- **Document Ingestion**: Process PDF, TXT, and Markdown files automatically
- **Qdrant Vector Database**: High-performance vector storage with similarity search
- **Sentence Transformer Embeddings**: Local embedding generation using state-of-the-art models
- **Intelligent Chunking**: Smart text chunking with overlap for better retrieval
- **Concurrent Processing**: Multi-threaded document processing for large collections
- **RAG Integration**: Seamless integration with Swarms Agent framework
- **Financial Analysis**: Specialized for quantitative trading and financial research
## 📋 Prerequisites
- Python 3.10+
- Qdrant client (local or cloud)
- Sentence transformers for embeddings
- Swarms framework
## 🛠️ Installation
1. **Install dependencies**:
```bash
pip install -r requirements.txt
```
2. **Set up environment variables** (optional, for cloud deployment):
```bash
export QDRANT_URL="your_qdrant_url"
export QDRANT_API_KEY="your_qdrant_api_key"
```
## 🏗️ Architecture
The example consists of three main components:
### 1. DocumentProcessor
- Handles file discovery and text extraction
- Supports PDF, TXT, and Markdown formats
- Concurrent processing for large document collections
- Error handling and validation
### 2. QdrantRAGMemory
- Vector database management with Qdrant
- Intelligent text chunking with overlap
- Semantic search capabilities
- Metadata storage and retrieval
### 3. QuantitativeTradingRAGAgent
- Combines Swarms Agent with RAG capabilities
- Financial analysis specialization
- Document context enhancement
- Query processing and response generation
## 📖 Usage
### Basic Setup
```python
from qdrant_rag_example import QuantitativeTradingRAGAgent
# Initialize the agent
agent = QuantitativeTradingRAGAgent(
agent_name="Financial-Analysis-Agent",
collection_name="financial_documents",
model_name="claude-sonnet-4-20250514"
)
```
### Document Ingestion
```python
# Ingest documents from a directory
documents_path = "./financial_documents"
num_ingested = agent.ingest_documents(documents_path)
print(f"Ingested {num_ingested} documents")
```
### Querying Documents
```python
# Search for relevant information
results = agent.query_documents("gold ETFs investment strategies", limit=5)
for result in results:
print(f"Document: {result['document_name']}")
print(f"Relevance: {result['similarity_score']:.3f}")
print(f"Content: {result['chunk_text'][:200]}...")
```
### Running Analysis
```python
# Run financial analysis with RAG context
task = "What are the best top 3 ETFs for gold coverage?"
response = agent.run_analysis(task)
print(response)
```
## 📁 Directory Structure
```
financial_documents/
├── research_papers/
│ ├── gold_etf_analysis.pdf
│ ├── market_research.pdf
│ └── portfolio_strategies.pdf
├── company_reports/
│ ├── annual_reports.txt
│ └── quarterly_updates.md
└── market_data/
├── historical_prices.csv
└── volatility_analysis.txt
```
## ⚙️ Configuration Options
### Agent Configuration
- `agent_name`: Name of the agent
- `collection_name`: Qdrant collection name
- `model_name`: LLM model to use
- `max_loops`: Maximum agent execution loops
- `chunk_size`: Text chunk size (default: 1000)
- `chunk_overlap`: Overlap between chunks (default: 200)
### Document Processing
- `supported_extensions`: File types to process
- `max_workers`: Concurrent processing threads
- `score_threshold`: Similarity search threshold
## 🔍 Advanced Features
### Custom Embedding Models
```python
# Use different sentence transformer models
from sentence_transformers import SentenceTransformer
custom_model = SentenceTransformer("all-mpnet-base-v2")
# Update the embedding model in QdrantRAGMemory
```
### Cloud Deployment
```python
# Connect to Qdrant cloud
agent = QuantitativeTradingRAGAgent(
qdrant_url="https://your-instance.qdrant.io",
qdrant_api_key="your_api_key"
)
```
### Batch Processing
```python
# Process multiple directories
directories = ["./docs1", "./docs2", "./docs3"]
for directory in directories:
agent.ingest_documents(directory)
```
## 📊 Performance Considerations
- **Chunk Size**: Larger chunks (1000-2000 chars) for detailed analysis, smaller (500-1000) for precise retrieval
- **Overlap**: 10-20% overlap between chunks for better context continuity
- **Concurrency**: Adjust `max_workers` based on your system capabilities
- **Vector Size**: 768 dimensions for sentence-transformers, 1536 for OpenAI embeddings
## 🚨 Error Handling
The system includes comprehensive error handling for:
- File not found errors
- Unsupported file types
- Processing failures
- Network connectivity issues
- Invalid document content
## 🔧 Troubleshooting
### Common Issues
1. **Import Errors**: Ensure all dependencies are installed
```bash
pip install -r requirements.txt
```
2. **Memory Issues**: Reduce chunk size or use cloud Qdrant
```python
agent = QuantitativeTradingRAGAgent(chunk_size=500)
```
3. **Processing Failures**: Check file permissions and formats
```python
# Verify supported formats
processor = DocumentProcessor(supported_extensions=['.pdf', '.txt'])
```
### Performance Optimization
- Use SSD storage for document processing
- Increase `max_workers` for multi-core systems
- Consider cloud Qdrant for large document collections
- Implement document caching for frequently accessed files
## 📈 Use Cases
- **Financial Research**: Analyze market reports, earnings calls, and research papers
- **Legal Document Review**: Process contracts, regulations, and case law
- **Academic Research**: Index research papers and academic literature
- **Compliance Monitoring**: Track regulatory changes and compliance requirements
- **Risk Assessment**: Analyze risk reports and market analysis
## 🤝 Contributing
To extend this example:
1. Add support for additional file formats
2. Implement custom embedding strategies
3. Add document versioning and change tracking
4. Integrate with other vector databases
5. Add document summarization capabilities
## 📄 License
This example is part of the Swarms framework and follows the same licensing terms.
## 🆘 Support
For issues and questions:
- Check the Swarms documentation
- Review the example code and error messages
- Ensure all dependencies are properly installed
- Verify Qdrant connection and configuration

@ -0,0 +1,882 @@
"""
Qdrant RAG Example with Document Ingestion
This example demonstrates how to use the agent structure from example.py with Qdrant RAG
to ingest a vast array of PDF documents and text files for advanced quantitative trading analysis.
Features:
- Document ingestion from multiple file types (PDF, TXT, MD)
- Qdrant vector database integration
- Sentence transformer embeddings
- Comprehensive document processing pipeline
- Agent with RAG capabilities for financial analysis
"""
import os
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from qdrant_client import QdrantClient
from qdrant_client.http import models
from qdrant_client.http.models import Distance, VectorParams
from sentence_transformers import SentenceTransformer
from swarms import Agent
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.data_to_text import data_to_text
class DocumentProcessor:
"""
Handles document processing and text extraction from various file formats.
This class provides functionality to process PDF, TXT, and Markdown files,
extracting text content for vectorization and storage in the RAG system.
"""
def __init__(
self, supported_extensions: Optional[List[str]] = None
):
"""
Initialize the DocumentProcessor.
Args:
supported_extensions: List of supported file extensions.
Defaults to ['.pdf', '.txt', '.md']
"""
if supported_extensions is None:
supported_extensions = [".pdf", ".txt", ".md"]
self.supported_extensions = supported_extensions
def process_document(
self, file_path: Union[str, Path]
) -> Optional[Dict[str, str]]:
"""
Process a single document and extract its text content.
Args:
file_path: Path to the document file
Returns:
Dictionary containing document metadata and extracted text, or None if processing fails
"""
file_path = Path(file_path)
if not file_path.exists():
print(f"File not found: {file_path}")
return None
if file_path.suffix.lower() not in self.supported_extensions:
print(f"Unsupported file type: {file_path.suffix}")
return None
try:
# Extract text based on file type
if file_path.suffix.lower() == ".pdf":
try:
text_content = pdf_to_text(str(file_path))
except Exception as pdf_error:
print(f"Error extracting PDF text: {pdf_error}")
# Fallback: try to read as text file
with open(
file_path,
"r",
encoding="utf-8",
errors="ignore",
) as f:
text_content = f.read()
else:
try:
text_content = data_to_text(str(file_path))
except Exception as data_error:
print(f"Error extracting text: {data_error}")
# Fallback: try to read as text file
with open(
file_path,
"r",
encoding="utf-8",
errors="ignore",
) as f:
text_content = f.read()
# Ensure text_content is a string
if callable(text_content):
print(
f"Warning: {file_path} returned a callable, trying to call it..."
)
try:
text_content = text_content()
except Exception as call_error:
print(f"Error calling callable: {call_error}")
return None
if not text_content or not isinstance(text_content, str):
print(
f"No valid text content extracted from: {file_path}"
)
return None
# Clean the text content
text_content = str(text_content).strip()
return {
"file_path": str(file_path),
"file_name": file_path.name,
"file_type": file_path.suffix.lower(),
"text_content": text_content,
"file_size": file_path.stat().st_size,
"processed_at": datetime.utcnow().isoformat(),
}
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
return None
def process_directory(
self, directory_path: Union[str, Path], max_workers: int = 4
) -> List[Dict[str, str]]:
"""
Process all supported documents in a directory concurrently.
Args:
directory_path: Path to the directory containing documents
max_workers: Maximum number of concurrent workers for processing
Returns:
List of processed document dictionaries
"""
directory_path = Path(directory_path)
if not directory_path.is_dir():
print(f"Directory not found: {directory_path}")
return []
# Find all supported files
supported_files = []
for ext in self.supported_extensions:
supported_files.extend(directory_path.rglob(f"*{ext}"))
supported_files.extend(
directory_path.rglob(f"*{ext.upper()}")
)
if not supported_files:
print(f"No supported files found in: {directory_path}")
return []
print(f"Found {len(supported_files)} files to process")
# Process files concurrently
processed_documents = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {
executor.submit(
self.process_document, file_path
): file_path
for file_path in supported_files
}
for future in concurrent.futures.as_completed(
future_to_file
):
file_path = future_to_file[future]
try:
result = future.result()
if result:
processed_documents.append(result)
print(f"Processed: {result['file_name']}")
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
print(
f"Successfully processed {len(processed_documents)} documents"
)
return processed_documents
class QdrantRAGMemory:
"""
Enhanced Qdrant memory system for RAG operations with document storage.
This class extends the basic Qdrant memory system to handle document ingestion,
chunking, and semantic search for large document collections.
"""
def __init__(
self,
collection_name: str = "document_memories",
vector_size: int = 384, # Default size for all-MiniLM-L6-v2
url: Optional[str] = None,
api_key: Optional[str] = None,
chunk_size: int = 1000,
chunk_overlap: int = 200,
):
"""
Initialize the Qdrant RAG memory system.
Args:
collection_name: Name of the Qdrant collection to use
vector_size: Dimension of the embedding vectors
url: Optional Qdrant server URL (defaults to local)
api_key: Optional Qdrant API key for cloud deployment
chunk_size: Size of text chunks for processing
chunk_overlap: Overlap between consecutive chunks
"""
self.collection_name = collection_name
self.vector_size = vector_size
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Initialize Qdrant client
if url and api_key:
self.client = QdrantClient(url=url, api_key=api_key)
else:
self.client = QdrantClient(
":memory:"
) # Local in-memory storage
# Initialize embedding model
self.embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
# Get the actual embedding dimension from the model
sample_text = "Sample text for dimension check"
sample_embedding = self.embedding_model.encode(sample_text)
actual_dimension = len(sample_embedding)
# Update vector_size to match the actual model dimension
if actual_dimension != self.vector_size:
print(
f"Updating vector size from {self.vector_size} to {actual_dimension} to match model"
)
self.vector_size = actual_dimension
# Create collection if it doesn't exist
self._create_collection()
def _create_collection(self):
"""Create the Qdrant collection if it doesn't exist."""
collections = self.client.get_collections().collections
exists = any(
col.name == self.collection_name for col in collections
)
if not exists:
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.vector_size, distance=Distance.COSINE
),
)
print(
f"Created Qdrant collection: {self.collection_name}"
)
def _chunk_text(self, text: str) -> List[str]:
"""
Split text into overlapping chunks for better retrieval.
Args:
text: Text content to chunk
Returns:
List of text chunks
"""
# Ensure text is a string
if not isinstance(text, str):
text = str(text)
if len(text) <= self.chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
# Try to break at sentence boundaries
if end < len(text):
# Look for sentence endings
for i in range(end, max(start, end - 100), -1):
if text[i] in ".!?":
end = i + 1
break
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - self.chunk_overlap
if start >= len(text):
break
return chunks
def add_document(
self, document_data: Dict[str, str]
) -> List[str]:
"""
Add a document to the memory system with chunking.
Args:
document_data: Dictionary containing document information
Returns:
List of memory IDs for the stored chunks
"""
text_content = document_data["text_content"]
# Ensure text_content is a string
if not isinstance(text_content, str):
print(
f"Warning: text_content is not a string: {type(text_content)}"
)
text_content = str(text_content)
chunks = self._chunk_text(text_content)
memory_ids = []
for i, chunk in enumerate(chunks):
# Generate embedding for the chunk
embedding = self.embedding_model.encode(chunk).tolist()
# Prepare metadata
metadata = {
"document_name": document_data["file_name"],
"document_path": document_data["file_path"],
"document_type": document_data["file_type"],
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_text": chunk,
"timestamp": datetime.utcnow().isoformat(),
"file_size": document_data["file_size"],
}
# Store the chunk
memory_id = str(uuid.uuid4())
self.client.upsert(
collection_name=self.collection_name,
points=[
models.PointStruct(
id=memory_id,
payload=metadata,
vector=embedding,
)
],
)
memory_ids.append(memory_id)
print(
f"Added document '{document_data['file_name']}' with {len(chunks)} chunks"
)
return memory_ids
def add_documents_batch(
self, documents: List[Dict[str, str]]
) -> List[str]:
"""
Add multiple documents to the memory system.
Args:
documents: List of document dictionaries
Returns:
List of all memory IDs
"""
all_memory_ids = []
for document in documents:
memory_ids = self.add_document(document)
all_memory_ids.extend(memory_ids)
return all_memory_ids
def add(self, text: str, metadata: Optional[Dict] = None) -> str:
"""
Add a text entry to the memory system (required by Swarms interface).
Args:
text: The text content to add
metadata: Optional metadata for the entry
Returns:
str: ID of the stored memory
"""
if metadata is None:
metadata = {}
# Generate embedding for the text
embedding = self.embedding_model.encode(text).tolist()
# Prepare metadata
memory_metadata = {
"text": text,
"timestamp": datetime.utcnow().isoformat(),
"source": "agent_memory",
}
memory_metadata.update(metadata)
# Store the point
memory_id = str(uuid.uuid4())
self.client.upsert(
collection_name=self.collection_name,
points=[
models.PointStruct(
id=memory_id,
payload=memory_metadata,
vector=embedding,
)
],
)
return memory_id
def query(
self,
query_text: str,
limit: int = 5,
score_threshold: float = 0.7,
include_metadata: bool = True,
) -> List[Dict]:
"""
Query memories based on text similarity.
Args:
query_text: The text query to search for
limit: Maximum number of results to return
score_threshold: Minimum similarity score threshold
include_metadata: Whether to include metadata in results
Returns:
List of matching memories with their metadata
"""
try:
# Check if collection has any points
collection_info = self.client.get_collection(
self.collection_name
)
if collection_info.points_count == 0:
print(
"Warning: Collection is empty, no documents to query"
)
return []
# Generate embedding for the query
query_embedding = self.embedding_model.encode(
query_text
).tolist()
# Search in Qdrant
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_embedding,
limit=limit,
score_threshold=score_threshold,
)
memories = []
for res in results:
memory = res.payload.copy()
memory["similarity_score"] = res.score
if not include_metadata:
# Keep only essential information
memory = {
"chunk_text": memory.get("chunk_text", ""),
"document_name": memory.get(
"document_name", ""
),
"similarity_score": memory[
"similarity_score"
],
}
memories.append(memory)
return memories
except Exception as e:
print(f"Error querying collection: {e}")
return []
def get_collection_stats(self) -> Dict:
"""
Get statistics about the collection.
Returns:
Dictionary containing collection statistics
"""
try:
collection_info = self.client.get_collection(
self.collection_name
)
return {
"collection_name": self.collection_name,
"vector_size": collection_info.config.params.vectors.size,
"distance": collection_info.config.params.vectors.distance,
"points_count": collection_info.points_count,
}
except Exception as e:
print(f"Error getting collection stats: {e}")
return {}
def clear_collection(self):
"""Clear all memories from the collection."""
self.client.delete_collection(self.collection_name)
self._create_collection()
print(f"Cleared collection: {self.collection_name}")
class QuantitativeTradingRAGAgent:
"""
Advanced quantitative trading agent with RAG capabilities for document analysis.
This agent combines the structure from example.py with Qdrant RAG to provide
comprehensive financial analysis based on ingested documents.
"""
def __init__(
self,
agent_name: str = "Quantitative-Trading-RAG-Agent",
collection_name: str = "financial_documents",
qdrant_url: Optional[str] = None,
qdrant_api_key: Optional[str] = None,
model_name: str = "claude-sonnet-4-20250514",
max_loops: int = 1,
chunk_size: int = 1000,
chunk_overlap: int = 200,
):
"""
Initialize the Quantitative Trading RAG Agent.
Args:
agent_name: Name of the agent
collection_name: Name of the Qdrant collection
qdrant_url: Optional Qdrant server URL
qdrant_api_key: Optional Qdrant API key
model_name: LLM model to use
max_loops: Maximum number of agent loops
chunk_size: Size of text chunks for processing
chunk_overlap: Overlap between consecutive chunks
"""
self.agent_name = agent_name
self.collection_name = collection_name
# Initialize document processor
self.document_processor = DocumentProcessor()
# Initialize Qdrant RAG memory
self.rag_memory = QdrantRAGMemory(
collection_name=collection_name,
url=qdrant_url,
api_key=qdrant_api_key,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
# Initialize the agent with RAG capabilities
self.agent = Agent(
agent_name=agent_name,
agent_description="Advanced quantitative trading and algorithmic analysis agent with RAG capabilities",
system_prompt="""You are an expert quantitative trading agent with deep expertise in:
- Algorithmic trading strategies and implementation
- Statistical arbitrage and market making
- Risk management and portfolio optimization
- High-frequency trading systems
- Market microstructure analysis
- Quantitative research methodologies
- Financial mathematics and stochastic processes
- Machine learning applications in trading
Your core responsibilities include:
1. Developing and backtesting trading strategies
2. Analyzing market data and identifying alpha opportunities
3. Implementing risk management frameworks
4. Optimizing portfolio allocations
5. Conducting quantitative research
6. Monitoring market microstructure
7. Evaluating trading system performance
You have access to a comprehensive document database through RAG (Retrieval-Augmented Generation).
When answering questions, you can search through this database to find relevant information
and provide evidence-based responses.
You maintain strict adherence to:
- Mathematical rigor in all analyses
- Statistical significance in strategy development
- Risk-adjusted return optimization
- Market impact minimization
- Regulatory compliance
- Transaction cost analysis
- Performance attribution
You communicate in precise, technical terms while maintaining clarity for stakeholders.""",
model_name=model_name,
dynamic_temperature_enabled=True,
output_type="str-all-except-first",
max_loops=max_loops,
dynamic_context_window=True,
long_term_memory=self.rag_memory,
)
def ingest_documents(
self, documents_path: Union[str, Path]
) -> int:
"""
Ingest documents from a directory into the RAG system.
Args:
documents_path: Path to directory containing documents
Returns:
Number of documents successfully ingested
"""
print(f"Starting document ingestion from: {documents_path}")
try:
# Process documents
processed_documents = (
self.document_processor.process_directory(
documents_path
)
)
if not processed_documents:
print("No documents to ingest")
return 0
# Add documents to RAG memory
memory_ids = self.rag_memory.add_documents_batch(
processed_documents
)
print(
f"Successfully ingested {len(processed_documents)} documents"
)
print(f"Created {len(memory_ids)} memory chunks")
return len(processed_documents)
except Exception as e:
print(f"Error during document ingestion: {e}")
import traceback
traceback.print_exc()
return 0
def query_documents(
self, query: str, limit: int = 5
) -> List[Dict]:
"""
Query the document database for relevant information.
Args:
query: The query text
limit: Maximum number of results to return
Returns:
List of relevant document chunks
"""
return self.rag_memory.query(query, limit=limit)
def run_analysis(self, task: str) -> str:
"""
Run a financial analysis task using the agent with RAG capabilities.
Args:
task: The analysis task to perform
Returns:
Agent's response to the task
"""
print(f"Running analysis task: {task}")
# First, query the document database for relevant context
relevant_docs = self.query_documents(task, limit=3)
if relevant_docs:
# Enhance the task with relevant document context
context = "\n\nRelevant Document Information:\n"
for i, doc in enumerate(relevant_docs, 1):
context += f"\nDocument {i}: {doc.get('document_name', 'Unknown')}\n"
context += f"Relevance Score: {doc.get('similarity_score', 0):.3f}\n"
context += (
f"Content: {doc.get('chunk_text', '')[:500]}...\n"
)
enhanced_task = f"{task}\n\n{context}"
else:
enhanced_task = task
# Run the agent
response = self.agent.run(enhanced_task)
return response
def get_database_stats(self) -> Dict:
"""
Get statistics about the document database.
Returns:
Dictionary containing database statistics
"""
return self.rag_memory.get_collection_stats()
def main():
"""
Main function demonstrating the Qdrant RAG agent with document ingestion.
"""
from datetime import datetime
# Example usage
print("🚀 Initializing Quantitative Trading RAG Agent...")
# Initialize the agent (you can set environment variables for Qdrant cloud)
agent = QuantitativeTradingRAGAgent(
agent_name="Quantitative-Trading-RAG-Agent",
collection_name="financial_documents",
qdrant_url=os.getenv(
"QDRANT_URL"
), # Optional: For cloud deployment
qdrant_api_key=os.getenv(
"QDRANT_API_KEY"
), # Optional: For cloud deployment
model_name="claude-sonnet-4-20250514",
max_loops=1,
chunk_size=1000,
chunk_overlap=200,
)
# Example: Ingest documents from a directory
documents_path = "documents" # Path to your documents
if os.path.exists(documents_path):
print(f"Found documents directory: {documents_path}")
try:
agent.ingest_documents(documents_path)
except Exception as e:
print(f"Error ingesting documents: {e}")
print("Continuing without document ingestion...")
else:
print(f"Documents directory not found: {documents_path}")
print("Creating a sample document for demonstration...")
# Create a sample document
try:
sample_doc = {
"file_path": "sample_financial_analysis.txt",
"file_name": "sample_financial_analysis.txt",
"file_type": ".txt",
"text_content": """
Gold ETFs: A Comprehensive Investment Guide
Gold ETFs (Exchange-Traded Funds) provide investors with exposure to gold prices
without the need to physically store the precious metal. These funds track the
price of gold and offer several advantages including liquidity, diversification,
and ease of trading.
Top Gold ETFs include:
1. SPDR Gold Shares (GLD) - Largest gold ETF with high liquidity
2. iShares Gold Trust (IAU) - Lower expense ratio alternative
3. Aberdeen Standard Physical Gold ETF (SGOL) - Swiss storage option
Investment strategies for gold ETFs:
- Portfolio diversification (5-10% allocation)
- Inflation hedge
- Safe haven during market volatility
- Tactical trading opportunities
Market analysis shows that gold has historically served as a store of value
and hedge against inflation. Recent market conditions have increased interest
in gold investments due to economic uncertainty and geopolitical tensions.
""",
"file_size": 1024,
"processed_at": datetime.utcnow().isoformat(),
}
# Add the sample document to the RAG memory
memory_ids = agent.rag_memory.add_document(sample_doc)
print(
f"Added sample document with {len(memory_ids)} chunks"
)
except Exception as e:
print(f"Error creating sample document: {e}")
print("Continuing without sample document...")
# Example: Query the database
print("\n📊 Querying document database...")
try:
query_results = agent.query_documents(
"gold ETFs investment strategies", limit=3
)
print(f"Found {len(query_results)} relevant document chunks")
if query_results:
print("Sample results:")
for i, result in enumerate(query_results[:2], 1):
print(
f" {i}. {result.get('document_name', 'Unknown')} (Score: {result.get('similarity_score', 0):.3f})"
)
else:
print(
"No documents found in database. This is expected if no documents were ingested."
)
except Exception as e:
print(f"❌ Query failed: {e}")
# Example: Run financial analysis
print("\n💹 Running financial analysis...")
analysis_task = "What are the best top 3 ETFs for gold coverage and what are their key characteristics?"
try:
response = agent.run_analysis(analysis_task)
print("\n📈 Analysis Results:")
print(response)
except Exception as e:
print(f"❌ Analysis failed: {e}")
print("This might be due to API key or model access issues.")
print("Continuing with database statistics...")
# Try a simpler query that doesn't require the LLM
print("\n🔍 Trying simple document query instead...")
try:
simple_results = agent.query_documents(
"what do you see in the document?", limit=2
)
if simple_results:
print("Simple query results:")
for i, result in enumerate(simple_results, 1):
print(
f" {i}. {result.get('document_name', 'Unknown')}"
)
print(
f" Content preview: {result.get('chunk_text', '')[:100]}..."
)
else:
print("No results from simple query")
except Exception as simple_error:
print(f"Simple query also failed: {simple_error}")
# Get database statistics
print("\n📊 Database Statistics:")
try:
stats = agent.get_database_stats()
for key, value in stats.items():
print(f" {key}: {value}")
except Exception as e:
print(f"❌ Failed to get database statistics: {e}")
print("\n✅ Example completed successfully!")
print("💡 To test with your own documents:")
print(" 1. Create a 'documents' directory")
print(" 2. Add PDF, TXT, or MD files")
print(" 3. Run the script again")
if __name__ == "__main__":
main()

@ -0,0 +1,184 @@
"""
Simple Example: Qdrant RAG with Document Ingestion
This is a simplified example showing the basic usage of the Qdrant RAG system
for document ingestion and querying.
"""
from pathlib import Path
from examples.single_agent.rag.qdrant_rag_example import (
QuantitativeTradingRAGAgent,
)
def create_sample_documents():
"""
Create sample documents for demonstration purposes.
"""
# Create a sample documents directory
docs_dir = Path("./sample_documents")
docs_dir.mkdir(exist_ok=True)
# Create sample text files
sample_texts = {
"gold_etf_guide.txt": """
Gold ETFs: A Comprehensive Guide
Gold ETFs (Exchange-Traded Funds) provide investors with exposure to gold prices
without the need to physically store the precious metal. These funds track the
price of gold and offer several advantages including liquidity, diversification,
and ease of trading.
Top Gold ETFs include:
1. SPDR Gold Shares (GLD) - Largest gold ETF with high liquidity
2. iShares Gold Trust (IAU) - Lower expense ratio alternative
3. Aberdeen Standard Physical Gold ETF (SGOL) - Swiss storage option
Investment strategies for gold ETFs:
- Portfolio diversification (5-10% allocation)
- Inflation hedge
- Safe haven during market volatility
- Tactical trading opportunities
""",
"market_analysis.txt": """
Market Analysis: Gold Investment Trends
Gold has historically served as a store of value and hedge against inflation.
Recent market conditions have increased interest in gold investments due to:
- Economic uncertainty and geopolitical tensions
- Inflation concerns and currency devaluation
- Central bank policies and interest rate environment
- Portfolio diversification needs
Key factors affecting gold prices:
- US Dollar strength/weakness
- Real interest rates
- Central bank gold purchases
- Market risk sentiment
- Supply and demand dynamics
Investment recommendations:
- Consider gold as 5-15% of total portfolio
- Use dollar-cost averaging for entry
- Monitor macroeconomic indicators
- Rebalance periodically
""",
"portfolio_strategies.txt": """
Portfolio Strategies: Incorporating Gold
Strategic allocation to gold can enhance portfolio performance through:
1. Risk Reduction:
- Negative correlation with equities during crises
- Volatility dampening effects
- Drawdown protection
2. Return Enhancement:
- Long-term appreciation potential
- Inflation-adjusted returns
- Currency diversification benefits
3. Implementation Methods:
- Physical gold (coins, bars)
- Gold ETFs and mutual funds
- Gold mining stocks
- Gold futures and options
Optimal allocation ranges:
- Conservative: 5-10%
- Moderate: 10-15%
- Aggressive: 15-20%
Rebalancing frequency: Quarterly to annually
""",
}
# Write sample files
for filename, content in sample_texts.items():
file_path = docs_dir / filename
with open(file_path, "w", encoding="utf-8") as f:
f.write(content.strip())
print(
f"Created {len(sample_texts)} sample documents in {docs_dir}"
)
return docs_dir
def main():
"""
Main function demonstrating basic Qdrant RAG usage.
"""
print("🚀 Qdrant RAG Simple Example")
print("=" * 50)
# Create sample documents
docs_dir = create_sample_documents()
# Initialize the RAG agent
print("\n📊 Initializing Quantitative Trading RAG Agent...")
agent = QuantitativeTradingRAGAgent(
agent_name="Simple-Financial-Agent",
collection_name="sample_financial_docs",
model_name="claude-sonnet-4-20250514",
chunk_size=800, # Smaller chunks for sample documents
chunk_overlap=100,
)
# Ingest the sample documents
print(f"\n📚 Ingesting documents from {docs_dir}...")
num_ingested = agent.ingest_documents(docs_dir)
print(f"✅ Successfully ingested {num_ingested} documents")
# Query the document database
print("\n🔍 Querying document database...")
queries = [
"What are the top gold ETFs?",
"How should I allocate gold in my portfolio?",
"What factors affect gold prices?",
]
for query in queries:
print(f"\nQuery: {query}")
results = agent.query_documents(query, limit=2)
print(f"Found {len(results)} relevant chunks:")
for i, result in enumerate(results, 1):
print(
f" {i}. {result['document_name']} (Score: {result['similarity_score']:.3f})"
)
print(f" Content: {result['chunk_text'][:150]}...")
# Run a comprehensive analysis
print("\n💹 Running comprehensive analysis...")
analysis_task = "Based on the available documents, provide a summary of gold ETF investment strategies and portfolio allocation recommendations."
try:
response = agent.run_analysis(analysis_task)
print("\n📈 Analysis Results:")
print("-" * 30)
print(response)
except Exception as e:
print(f"❌ Analysis failed: {e}")
print("This might be due to API key or model access issues.")
# Show database statistics
print("\n📊 Database Statistics:")
stats = agent.get_database_stats()
for key, value in stats.items():
print(f" {key}: {value}")
# Cleanup
print("\n🧹 Cleaning up sample documents...")
import shutil
if docs_dir.exists():
shutil.rmtree(docs_dir)
print("Sample documents removed.")
print("\n✅ Example completed successfully!")
if __name__ == "__main__":
main()

@ -48,17 +48,11 @@ from swarms.schemas.agent_mcp_errors import (
from swarms.schemas.agent_step_schemas import ManySteps, Step
from swarms.schemas.base_schemas import (
AgentChatCompletionResponse,
ChatCompletionResponseChoice,
ChatMessageResponse,
)
from swarms.schemas.conversation_schema import ConversationSchema
from swarms.schemas.mcp_schemas import (
MCPConnection,
)
from swarms.structs.agent_rag_handler import (
AgentRAGHandler,
RAGConfig,
)
from swarms.structs.agent_roles import agent_roles
from swarms.structs.conversation import Conversation
from swarms.structs.ma_utils import set_random_models_for_agents
@ -78,6 +72,7 @@ from swarms.tools.py_func_to_openai_func_str import (
convert_multiple_functions_to_openai_function_schema,
)
from swarms.utils.data_to_text import data_to_text
from swarms.utils.dynamic_context_window import dynamic_auto_chunking
from swarms.utils.file_processing import create_file_in_folder
from swarms.utils.formatter import formatter
from swarms.utils.generate_keys import generate_api_key
@ -116,8 +111,6 @@ ToolUsageType = Union[BaseModel, Dict[str, Any]]
# Agent Exceptions
class AgentError(Exception):
"""Base class for all agent-related exceptions."""
@ -430,7 +423,6 @@ class Agent:
conversation_schema: Optional[ConversationSchema] = None,
llm_base_url: Optional[str] = None,
llm_api_key: Optional[str] = None,
rag_config: Optional[RAGConfig] = None,
tool_call_summary: bool = True,
output_raw_json_from_tool_call: bool = False,
summarize_multiple_images: bool = False,
@ -570,7 +562,6 @@ class Agent:
self.conversation_schema = conversation_schema
self.llm_base_url = llm_base_url
self.llm_api_key = llm_api_key
self.rag_config = rag_config
self.tool_call_summary = tool_call_summary
self.output_raw_json_from_tool_call = (
output_raw_json_from_tool_call
@ -625,22 +616,11 @@ class Agent:
if self.random_models_on is True:
self.model_name = set_random_models_for_agents()
if self.long_term_memory is not None:
self.rag_handler = self.rag_setup_handling()
if self.dashboard is True:
self.print_dashboard()
self.reliability_check()
def rag_setup_handling(self):
return AgentRAGHandler(
long_term_memory=self.long_term_memory,
config=self.rag_config,
agent_name=self.agent_name,
verbose=self.verbose,
)
def setup_tools(self):
return BaseTool(
tools=self.tools,
@ -1022,6 +1002,42 @@ class Agent:
title=f"Agent {self.agent_name} Dashboard",
)
def handle_rag_query(self, query: str):
"""
Handle RAG query
"""
try:
logger.info(
f"Agent: {self.agent_name} Querying RAG memory for: {query}"
)
output = self.long_term_memory.query(
query,
)
output = dynamic_auto_chunking(
content=output,
context_length=self.max_tokens,
tokenizer_model_name=self.model_name,
)
self.short_memory.add(
role="system",
content=(
f"🔍 [RAG Query Initiated]\n"
f"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
f"📝 Query:\n{query}\n\n"
f"📚 Retrieved Knowledge (RAG Output):\n{output}\n"
f"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n"
f"💡 The above information was retrieved from the agent's long-term memory using Retrieval-Augmented Generation (RAG). "
f"Use this context to inform your next response or reasoning step."
),
)
except Exception as e:
logger.error(
f"Agent: {self.agent_name} Error handling RAG query: {e} Traceback: {traceback.format_exc()}"
)
raise e
# Main function
def _run(
self,
@ -1032,21 +1048,30 @@ class Agent:
**kwargs,
) -> Any:
"""
run the agent
Execute the agent's main loop for a given task.
This function manages the agent's reasoning and action loop, including:
- Initializing the task and context
- Handling Retrieval-Augmented Generation (RAG) queries if enabled
- Planning (if enabled)
- Managing internal reasoning loops and memory
- Optionally processing images and streaming output
- Autosaving agent state if configured
Args:
task (str): The task to be performed.
img (str): The image to be processed.
is_last (bool): Indicates if this is the last task.
task (Optional[Union[str, Any]]): The task or prompt for the agent to process.
img (Optional[str]): Optional image path or data to be processed by the agent.
streaming_callback (Optional[Callable[[str], None]]): Optional callback for streaming output.
*args: Additional positional arguments for extensibility.
**kwargs: Additional keyword arguments for extensibility.
Returns:
Any: The output of the agent.
(string, list, json, dict, yaml, xml)
Any: The agent's output, which may be a string, list, JSON, dict, YAML, XML, or other type depending on the agent's configuration and the task.
Examples:
agent(task="What is the capital of France?")
agent(task="What is the capital of France?", img="path/to/image.jpg")
agent(task="What is the capital of France?", img="path/to/image.jpg", is_last=True)
agent(task="Summarize this document.", img="path/to/image.jpg")
agent(task="Analyze this image.", img="path/to/image.jpg", is_last=True)
"""
try:
@ -1056,6 +1081,13 @@ class Agent:
self.short_memory.add(role=self.user_name, content=task)
# Handle RAG query only once
if (
self.long_term_memory is not None
and self.rag_every_loop is False
):
self.handle_rag_query(task)
if self.plan_enabled is True:
self.plan(task)
@ -1076,6 +1108,13 @@ class Agent:
):
loop_count += 1
# Handle RAG query every loop
if (
self.long_term_memory is not None
and self.rag_every_loop is True
):
self.handle_rag_query(task)
if (
isinstance(self.max_loops, int)
and self.max_loops >= 2
@ -1284,7 +1323,7 @@ class Agent:
traceback_info = traceback.format_exc()
logger.error(
f"An error occurred while running your agent {self.agent_name}.\n"
f"Agent: {self.agent_name} An error occurred while running your agent.\n"
f"Error Type: {error_type}\n"
f"Error Message: {error_message}\n"
f"Traceback:\n{traceback_info}\n"
@ -1508,8 +1547,16 @@ class Agent:
"Max loops is not provided or is set to 0. Please set max loops to 1 or more."
)
if self.max_tokens is None or self.max_tokens == 0:
self.max_tokens = get_max_tokens(self.model_name)
# Ensure max_tokens is set to a valid value based on the model, with a robust fallback.
if self.max_tokens is None or self.max_tokens <= 0:
suggested_tokens = get_max_tokens(self.model_name)
if suggested_tokens is not None and suggested_tokens > 0:
self.max_tokens = suggested_tokens
else:
logger.warning(
f"Could not determine max_tokens for model '{self.model_name}'. Falling back to default value of 8192."
)
self.max_tokens = 8192
if self.context_length is None or self.context_length == 0:
raise AgentInitializationError(
@ -2097,40 +2144,6 @@ class Agent:
)
raise error
def memory_query(self, task: str = None, *args, **kwargs) -> None:
try:
# Query the long term memory
if self.long_term_memory is not None:
formatter.print_panel(f"Querying RAG for: {task}")
memory_retrieval = self.long_term_memory.query(
task, *args, **kwargs
)
memory_retrieval = (
f"Documents Available: {str(memory_retrieval)}"
)
# # Count the tokens
# memory_token_count = count_tokens(
# memory_retrieval
# )
# if memory_token_count > self.memory_chunk_size:
# # Truncate the memory by the memory chunk size
# memory_retrieval = self.truncate_string_by_tokens(
# memory_retrieval, self.memory_chunk_size
# )
self.short_memory.add(
role="Database",
content=memory_retrieval,
)
return None
except Exception as e:
logger.error(f"An error occurred: {e}")
raise e
def sentiment_analysis_handler(self, response: str = None):
"""
Performs sentiment analysis on the given response and stores the result in the short-term memory.
@ -2223,107 +2236,6 @@ class Agent:
return out
def log_step_metadata(
self, loop: int, task: str, response: str
) -> Step:
"""Log metadata for each step of agent execution."""
# Generate unique step ID
step_id = f"step_{loop}_{uuid.uuid4().hex}"
# Calculate token usage
# full_memory = self.short_memory.return_history_as_string()
# prompt_tokens = count_tokens(full_memory)
# completion_tokens = count_tokens(response)
# total_tokens = prompt_tokens + completion_tokens
total_tokens = (count_tokens(task) + count_tokens(response),)
# # Get memory responses
# memory_responses = {
# "short_term": (
# self.short_memory.return_history_as_string()
# if self.short_memory
# else None
# ),
# "long_term": (
# self.long_term_memory.query(task)
# if self.long_term_memory
# else None
# ),
# }
# # Get tool responses if tool was used
# if self.tools:
# try:
# tool_call_output = parse_and_execute_json(
# self.tools, response, parse_md=True
# )
# if tool_call_output:
# {
# "tool_name": tool_call_output.get(
# "tool_name", "unknown"
# ),
# "tool_args": tool_call_output.get("args", {}),
# "tool_output": str(
# tool_call_output.get("output", "")
# ),
# }
# except Exception as e:
# logger.debug(
# f"No tool call detected in response: {e}"
# )
# Create memory usage tracking
# memory_usage = {
# "short_term": (
# len(self.short_memory.messages)
# if self.short_memory
# else 0
# ),
# "long_term": (
# self.long_term_memory.count
# if self.long_term_memory
# else 0
# ),
# "responses": memory_responses,
# }
step_log = Step(
step_id=step_id,
time=time.time(),
tokens=total_tokens,
response=AgentChatCompletionResponse(
id=self.id,
agent_name=self.agent_name,
object="chat.completion",
choices=ChatCompletionResponseChoice(
index=loop,
input=task,
message=ChatMessageResponse(
role=self.agent_name,
content=response,
),
),
# usage=UsageInfo(
# prompt_tokens=prompt_tokens,
# completion_tokens=completion_tokens,
# total_tokens=total_tokens,
# ),
# tool_calls=(
# [] if tool_response is None else [tool_response]
# ),
# memory_usage=None,
),
)
# Update total tokens if agent_output exists
# if hasattr(self, "agent_output"):
# self.agent_output.total_tokens += (
# self.response.total_tokens
# )
# Add step to agent output tracking
self.step_pool.append(step_log)
def update_tool_usage(
self,
step_id: str,

Loading…
Cancel
Save