From 7f6ba0eb732279f4ca10d1d93274d4b5df50bc40 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Wed, 20 Aug 2025 15:00:53 -0700 Subject: [PATCH] [Agent][Docs] --- docs/swarms/structs/agent.md | 807 +++++++++++----- examples/single_agent/rag/README.md | 230 +++++ .../single_agent/rag/qdrant_rag_example.py | 882 ++++++++++++++++++ examples/single_agent/rag/simple_example.py | 184 ++++ swarms/structs/agent.py | 246 ++--- 5 files changed, 1924 insertions(+), 425 deletions(-) create mode 100644 examples/single_agent/rag/README.md create mode 100644 examples/single_agent/rag/qdrant_rag_example.py create mode 100644 examples/single_agent/rag/simple_example.py diff --git a/docs/swarms/structs/agent.md b/docs/swarms/structs/agent.md index 862aa36c..8f729c8f 100644 --- a/docs/swarms/structs/agent.md +++ b/docs/swarms/structs/agent.md @@ -1,26 +1,4 @@ -# `Agent` - -Swarm Agent is a powerful autonomous agent framework designed to connect Language Models (LLMs) with various tools and long-term memory. This class provides the ability to ingest and process various types of documents such as PDFs, text files, Markdown files, JSON files, and more. The Agent structure offers a wide range of features to enhance the capabilities of LLMs and facilitate efficient task execution. - -## Overview - -The `Agent` class establishes a conversational loop with a language model, allowing for interactive task execution, feedback collection, and dynamic response generation. It includes features such as: - -1. **Conversational Loop**: Enables back-and-forth interaction with the model. -2. **Feedback Collection**: Allows users to provide feedback on generated responses. -3. **Stoppable Conversation**: Supports custom stopping conditions for the conversation. -4. **Retry Mechanism**: Implements a retry system for handling issues in response generation. -5. **Tool Integration**: Supports the integration of various tools for enhanced capabilities. -6. **Long-term Memory Management**: Incorporates vector databases for efficient information retrieval. -7. **Document Ingestion**: Processes various document types for information extraction. -8. **Interactive Mode**: Allows real-time communication with the agent. -9. **Sentiment Analysis**: Evaluates the sentiment of generated responses. -10. **Output Filtering and Cleaning**: Ensures generated responses meet specific criteria. -11. **Asynchronous and Concurrent Execution**: Supports efficient parallelization of tasks. -12. **Planning and Reasoning**: Implements techniques like algorithm of thoughts for enhanced decision-making. - - -## Architecture +# `Agent` Structure Reference Documentation ```mermaid graph TD @@ -41,137 +19,170 @@ graph TD L -->|Proceeds to Final LLM Processing| I ``` +The `Agent` class is the core component of the Swarm Agent framework. It serves as an autonomous agent that bridges Language Models (LLMs) with external tools and long-term memory systems. The class is designed to handle a variety of document typesβ€”including PDFs, text files, Markdown, and JSONβ€”enabling robust document ingestion and processing. By integrating these capabilities, the `Agent` class empowers LLMs to perform complex tasks, utilize external resources, and manage information efficiently, making it a versatile solution for advanced autonomous workflows. + + +## Features +The `Agent` class establishes a conversational loop with a language model, allowing for interactive task execution, feedback collection, and dynamic response generation. It includes features such as: + +| Feature | Description | +|------------------------------------------|--------------------------------------------------------------------------------------------------| +| **Conversational Loop** | Enables back-and-forth interaction with the model. | +| **Feedback Collection** | Allows users to provide feedback on generated responses. | +| **Stoppable Conversation** | Supports custom stopping conditions for the conversation. | +| **Retry Mechanism** | Implements a retry system for handling issues in response generation. | +| **Tool Integration** | Supports the integration of various tools for enhanced capabilities. | +| **Long-term Memory Management** | Incorporates vector databases for efficient information retrieval. | +| **Document Ingestion** | Processes various document types for information extraction. | +| **Interactive Mode** | Allows real-time communication with the agent. | +| **Sentiment Analysis** | Evaluates the sentiment of generated responses. | +| **Output Filtering and Cleaning** | Ensures generated responses meet specific criteria. | +| **Asynchronous and Concurrent Execution**| Supports efficient parallelization of tasks. | +| **Planning and Reasoning** | Implements techniques like algorithm of thoughts for enhanced decision-making. | + + + + ## `Agent` Attributes -| Attribute | Description | -|-----------|-------------| -| `id` | Unique identifier for the agent instance. | -| `llm` | Language model instance used by the agent. | -| `template` | Template used for formatting responses. | -| `max_loops` | Maximum number of loops the agent can run. | -| `stopping_condition` | Callable function determining when to stop looping. | -| `loop_interval` | Interval (in seconds) between loops. | -| `retry_attempts` | Number of retry attempts for failed LLM calls. | -| `retry_interval` | Interval (in seconds) between retry attempts. | -| `return_history` | Boolean indicating whether to return conversation history. | -| `stopping_token` | Token that stops the agent from looping when present in the response. | -| `dynamic_loops` | Boolean indicating whether to dynamically determine the number of loops. | -| `interactive` | Boolean indicating whether to run in interactive mode. | -| `dashboard` | Boolean indicating whether to display a dashboard. | -| `agent_name` | Name of the agent instance. | -| `agent_description` | Description of the agent instance. | -| `system_prompt` | System prompt used to initialize the conversation. | -| `tools` | List of callable functions representing tools the agent can use. | -| `dynamic_temperature_enabled` | Boolean indicating whether to dynamically adjust the LLM's temperature. | -| `sop` | Standard operating procedure for the agent. | -| `sop_list` | List of strings representing the standard operating procedure. | -| `saved_state_path` | File path for saving and loading the agent's state. | -| `autosave` | Boolean indicating whether to automatically save the agent's state. | -| `context_length` | Maximum length of the context window (in tokens) for the LLM. | -| `user_name` | Name used to represent the user in the conversation. | -| `self_healing_enabled` | Boolean indicating whether to attempt self-healing in case of errors. | -| `code_interpreter` | Boolean indicating whether to interpret and execute code snippets. | -| `multi_modal` | Boolean indicating whether to support multimodal inputs. | -| `pdf_path` | File path of a PDF document to be ingested. | -| `list_of_pdf` | List of file paths for PDF documents to be ingested. | -| `tokenizer` | Instance of a tokenizer used for token counting and management. | -| `long_term_memory` | Instance of a `BaseVectorDatabase` implementation for long-term memory management. | -| `preset_stopping_token` | Boolean indicating whether to use a preset stopping token. | -| `traceback` | Object used for traceback handling. | -| `traceback_handlers` | List of traceback handlers. | -| `streaming_on` | Boolean indicating whether to stream responses. | -| `docs` | List of document paths or contents to be ingested. | -| `docs_folder` | Path to a folder containing documents to be ingested. | -| `verbose` | Boolean indicating whether to print verbose output. | -| `parser` | Callable function used for parsing input data. | -| `best_of_n` | Integer indicating the number of best responses to generate. | -| `callback` | Callable function to be called after each agent loop. | -| `metadata` | Dictionary containing metadata for the agent. | -| `callbacks` | List of callable functions to be called during execution. | -| `search_algorithm` | Callable function for long-term memory retrieval. | -| `logs_to_filename` | File path for logging agent activities. | -| `evaluator` | Callable function for evaluating the agent's responses. | -| `stopping_func` | Callable function used as a stopping condition. | -| `custom_loop_condition` | Callable function used as a custom loop condition. | -| `sentiment_threshold` | Float value representing the sentiment threshold for evaluating responses. | -| `custom_exit_command` | String representing a custom command for exiting the agent's loop. | -| `sentiment_analyzer` | Callable function for sentiment analysis on outputs. | -| `limit_tokens_from_string` | Callable function for limiting the number of tokens in a string. | -| `custom_tools_prompt` | Callable function for generating a custom prompt for tool usage. | -| `tool_schema` | Data structure representing the schema for the agent's tools. | -| `output_type` | Type representing the expected output type of responses. | -| `function_calling_type` | String representing the type of function calling. | -| `output_cleaner` | Callable function for cleaning the agent's output. | -| `function_calling_format_type` | String representing the format type for function calling. | -| `list_base_models` | List of base models used for generating tool schemas. | -| `metadata_output_type` | String representing the output type for metadata. | -| `state_save_file_type` | String representing the file type for saving the agent's state. | -| `chain_of_thoughts` | Boolean indicating whether to use the chain of thoughts technique. | -| `algorithm_of_thoughts` | Boolean indicating whether to use the algorithm of thoughts technique. | -| `tree_of_thoughts` | Boolean indicating whether to use the tree of thoughts technique. | -| `tool_choice` | String representing the method for tool selection. | -| `execute_tool` | Boolean indicating whether to execute tools. | -| `rules` | String representing the rules for the agent's behavior. | -| `planning` | Boolean indicating whether to perform planning. | -| `planning_prompt` | String representing the prompt for planning. | -| `device` | String representing the device on which the agent should run. | -| `custom_planning_prompt` | String representing a custom prompt for planning. | -| `memory_chunk_size` | Integer representing the maximum size of memory chunks for long-term memory retrieval. | -| `agent_ops_on` | Boolean indicating whether agent operations should be enabled. | -| `return_step_meta` | Boolean indicating whether to return JSON of all steps and additional metadata. | -| `time_created` | Float representing the time the agent was created. | -| `tags` | Optional list of strings for tagging the agent. | -| `use_cases` | Optional list of dictionaries describing use cases for the agent. | -| `step_pool` | List of Step objects representing the agent's execution steps. | -| `print_every_step` | Boolean indicating whether to print every step of execution. | -| `agent_output` | ManySteps object containing the agent's output and metadata. | -| `data_memory` | Optional callable for data memory operations. | -| `load_yaml_path` | String representing the path to a YAML file for loading configurations. | -| `auto_generate_prompt` | Boolean indicating whether to automatically generate prompts. | -| `rag_every_loop` | Boolean indicating whether to query RAG database for context on every loop | -| `plan_enabled` | Boolean indicating whether planning functionality is enabled | -| `artifacts_on` | Boolean indicating whether to save artifacts from agent execution | -| `artifacts_output_path` | File path where artifacts should be saved | -| `artifacts_file_extension` | File extension to use for saved artifacts | -| `all_cores` | Boolean indicating whether to use all CPU cores | -| `device_id` | ID of the GPU device to use if running on GPU | -| `scheduled_run_date` | Optional datetime for scheduling future agent runs | -| `do_not_use_cluster_ops` | Boolean indicating whether to avoid cluster operations | -| `all_gpus` | Boolean indicating whether to use all available GPUs | -| `model_name` | String representing the name of the model to use | -| `llm_args` | Dictionary containing additional arguments for the LLM | -| `load_state_path` | String representing the path to load state from | -| `role` | String representing the role of the agent (e.g., "worker") | -| `print_on` | Boolean indicating whether to print output | -| `tools_list_dictionary` | List of dictionaries representing tool schemas | -| `mcp_url` | String or MCPConnection representing the MCP server URL | -| `mcp_urls` | List of strings representing multiple MCP server URLs | -| `react_on` | Boolean indicating whether to enable ReAct reasoning | -| `safety_prompt_on` | Boolean indicating whether to enable safety prompts | -| `random_models_on` | Boolean indicating whether to randomly select models | -| `mcp_config` | MCPConnection object containing MCP configuration | -| `top_p` | Float representing the top-p sampling parameter | -| `conversation_schema` | ConversationSchema object for conversation formatting | -| `llm_base_url` | String representing the base URL for the LLM API | -| `llm_api_key` | String representing the API key for the LLM | -| `rag_config` | RAGConfig object containing RAG configuration | -| `tool_call_summary` | Boolean indicating whether to summarize tool calls | -| `output_raw_json_from_tool_call` | Boolean indicating whether to output raw JSON from tool calls | -| `summarize_multiple_images` | Boolean indicating whether to summarize multiple image outputs | -| `tool_retry_attempts` | Integer representing the number of retry attempts for tool execution | -| `reasoning_prompt_on` | Boolean indicating whether to enable reasoning prompts | -| `dynamic_context_window` | Boolean indicating whether to dynamically adjust context window | -| `created_at` | Float representing the timestamp when the agent was created | -| `workspace_dir` | String representing the workspace directory for the agent | -| `timeout` | Integer representing the timeout for operations in seconds | +| Attribute | Type | Description | +|-----------|------|-------------| +| `id` | `Optional[str]` | Unique identifier for the agent instance. | +| `llm` | `Optional[Any]` | Language model instance used by the agent. | +| `template` | `Optional[str]` | Template used for formatting responses. | +| `max_loops` | `Optional[Union[int, str]]` | Maximum number of loops the agent can run. | +| `stopping_condition` | `Optional[Callable[[str], bool]]` | Callable function determining when to stop looping. | +| `loop_interval` | `Optional[int]` | Interval (in seconds) between loops. | +| `retry_attempts` | `Optional[int]` | Number of retry attempts for failed LLM calls. | +| `retry_interval` | `Optional[int]` | Interval (in seconds) between retry attempts. | +| `return_history` | `Optional[bool]` | Boolean indicating whether to return conversation history. | +| `stopping_token` | `Optional[str]` | Token that stops the agent from looping when present in the response. | +| `dynamic_loops` | `Optional[bool]` | Boolean indicating whether to dynamically determine the number of loops. | +| `interactive` | `Optional[bool]` | Boolean indicating whether to run in interactive mode. | +| `dashboard` | `Optional[bool]` | Boolean indicating whether to display a dashboard. | +| `agent_name` | `Optional[str]` | Name of the agent instance. | +| `agent_description` | `Optional[str]` | Description of the agent instance. | +| `system_prompt` | `Optional[str]` | System prompt used to initialize the conversation. | +| `tools` | `List[Callable]` | List of callable functions representing tools the agent can use. | +| `dynamic_temperature_enabled` | `Optional[bool]` | Boolean indicating whether to dynamically adjust the LLM's temperature. | +| `sop` | `Optional[str]` | Standard operating procedure for the agent. | +| `sop_list` | `Optional[List[str]]` | List of strings representing the standard operating procedure. | +| `saved_state_path` | `Optional[str]` | File path for saving and loading the agent's state. | +| `autosave` | `Optional[bool]` | Boolean indicating whether to automatically save the agent's state. | +| `context_length` | `Optional[int]` | Maximum length of the context window (in tokens) for the LLM. | +| `user_name` | `Optional[str]` | Name used to represent the user in the conversation. | +| `self_healing_enabled` | `Optional[bool]` | Boolean indicating whether to attempt self-healing in case of errors. | +| `code_interpreter` | `Optional[bool]` | Boolean indicating whether to interpret and execute code snippets. | +| `multi_modal` | `Optional[bool]` | Boolean indicating whether to support multimodal inputs. | +| `pdf_path` | `Optional[str]` | File path of a PDF document to be ingested. | +| `list_of_pdf` | `Optional[str]` | List of file paths for PDF documents to be ingested. | +| `tokenizer` | `Optional[Any]` | Instance of a tokenizer used for token counting and management. | +| `long_term_memory` | `Optional[Union[Callable, Any]]` | Instance of a `BaseVectorDatabase` implementation for long-term memory management. | +| `preset_stopping_token` | `Optional[bool]` | Boolean indicating whether to use a preset stopping token. | +| `traceback` | `Optional[Any]` | Object used for traceback handling. | +| `traceback_handlers` | `Optional[Any]` | List of traceback handlers. | +| `streaming_on` | `Optional[bool]` | Boolean indicating whether to stream responses. | +| `docs` | `List[str]` | List of document paths or contents to be ingested. | +| `docs_folder` | `Optional[str]` | Path to a folder containing documents to be ingested. | +| `verbose` | `Optional[bool]` | Boolean indicating whether to print verbose output. | +| `parser` | `Optional[Callable]` | Callable function used for parsing input data. | +| `best_of_n` | `Optional[int]` | Integer indicating the number of best responses to generate. | +| `callback` | `Optional[Callable]` | Callable function to be called after each agent loop. | +| `metadata` | `Optional[Dict[str, Any]]` | Dictionary containing metadata for the agent. | +| `callbacks` | `Optional[List[Callable]]` | List of callable functions to be called during execution. | +| `search_algorithm` | `Optional[Callable]` | Callable function for long-term memory retrieval. | +| `logs_to_filename` | `Optional[str]` | File path for logging agent activities. | +| `evaluator` | `Optional[Callable]` | Callable function for evaluating the agent's responses. | +| `stopping_func` | `Optional[Callable]` | Callable function used as a stopping condition. | +| `custom_loop_condition` | `Optional[Callable]` | Callable function used as a custom loop condition. | +| `sentiment_threshold` | `Optional[float]` | Float value representing the sentiment threshold for evaluating responses. | +| `custom_exit_command` | `Optional[str]` | String representing a custom command for exiting the agent's loop. | +| `sentiment_analyzer` | `Optional[Callable]` | Callable function for sentiment analysis on outputs. | +| `limit_tokens_from_string` | `Optional[Callable]` | Callable function for limiting the number of tokens in a string. | +| `custom_tools_prompt` | `Optional[Callable]` | Callable function for generating a custom prompt for tool usage. | +| `tool_schema` | `ToolUsageType` | Data structure representing the schema for the agent's tools. | +| `output_type` | `OutputType` | Type representing the expected output type of responses. | +| `function_calling_type` | `str` | String representing the type of function calling. | +| `output_cleaner` | `Optional[Callable]` | Callable function for cleaning the agent's output. | +| `function_calling_format_type` | `Optional[str]` | String representing the format type for function calling. | +| `list_base_models` | `Optional[List[BaseModel]]` | List of base models used for generating tool schemas. | +| `metadata_output_type` | `str` | String representing the output type for metadata. | +| `state_save_file_type` | `str` | String representing the file type for saving the agent's state. | +| `chain_of_thoughts` | `bool` | Boolean indicating whether to use the chain of thoughts technique. | +| `algorithm_of_thoughts` | `bool` | Boolean indicating whether to use the algorithm of thoughts technique. | +| `tree_of_thoughts` | `bool` | Boolean indicating whether to use the tree of thoughts technique. | +| `tool_choice` | `str` | String representing the method for tool selection. | +| `execute_tool` | `bool` | Boolean indicating whether to execute tools. | +| `rules` | `str` | String representing the rules for the agent's behavior. | +| `planning` | `Optional[str]` | Boolean indicating whether to perform planning. | +| `planning_prompt` | `Optional[str]` | String representing the prompt for planning. | +| `device` | `str` | String representing the device on which the agent should run. | +| `custom_planning_prompt` | `str` | String representing a custom prompt for planning. | +| `memory_chunk_size` | `int` | Integer representing the maximum size of memory chunks for long-term memory retrieval. | +| `agent_ops_on` | `bool` | Boolean indicating whether agent operations should be enabled. | +| `return_step_meta` | `Optional[bool]` | Boolean indicating whether to return JSON of all steps and additional metadata. | +| `time_created` | `Optional[str]` | Float representing the time the agent was created. | +| `tags` | `Optional[List[str]]` | Optional list of strings for tagging the agent. | +| `use_cases` | `Optional[List[Dict[str, str]]]` | Optional list of dictionaries describing use cases for the agent. | +| `step_pool` | `List[Step]` | List of Step objects representing the agent's execution steps. | +| `print_every_step` | `Optional[bool]` | Boolean indicating whether to print every step of execution. | +| `agent_output` | `ManySteps` | ManySteps object containing the agent's output and metadata. | +| `data_memory` | `Optional[Callable]` | Optional callable for data memory operations. | +| `load_yaml_path` | `str` | String representing the path to a YAML file for loading configurations. | +| `auto_generate_prompt` | `bool` | Boolean indicating whether to automatically generate prompts. | +| `rag_every_loop` | `bool` | Boolean indicating whether to query RAG database for context on every loop | +| `plan_enabled` | `bool` | Boolean indicating whether planning functionality is enabled | +| `artifacts_on` | `bool` | Boolean indicating whether to save artifacts from agent execution | +| `artifacts_output_path` | `str` | File path where artifacts should be saved | +| `artifacts_file_extension` | `str` | File extension to use for saved artifacts | +| `all_cores` | `bool` | Boolean indicating whether to use all CPU cores | +| `device_id` | `int` | ID of the GPU device to use if running on GPU | +| `scheduled_run_date` | `Optional[datetime]` | Optional datetime for scheduling future agent runs | +| `do_not_use_cluster_ops` | `bool` | Boolean indicating whether to avoid cluster operations | +| `all_gpus` | `bool` | Boolean indicating whether to use all available GPUs | +| `model_name` | `str` | String representing the name of the model to use | +| `llm_args` | `dict` | Dictionary containing additional arguments for the LLM | +| `load_state_path` | `str` | String representing the path to load state from | +| `role` | `agent_roles` | String representing the role of the agent (e.g., "worker") | +| `print_on` | `bool` | Boolean indicating whether to print output | +| `tools_list_dictionary` | `Optional[List[Dict[str, Any]]]` | List of dictionaries representing tool schemas | +| `mcp_url` | `Optional[Union[str, MCPConnection]]` | String or MCPConnection representing the MCP server URL | +| `mcp_urls` | `List[str]` | List of strings representing multiple MCP server URLs | +| `react_on` | `bool` | Boolean indicating whether to enable ReAct reasoning | +| `safety_prompt_on` | `bool` | Boolean indicating whether to enable safety prompts | +| `random_models_on` | `bool` | Boolean indicating whether to randomly select models | +| `mcp_config` | `Optional[MCPConnection]` | MCPConnection object containing MCP configuration | +| `top_p` | `Optional[float]` | Float representing the top-p sampling parameter | +| `conversation_schema` | `Optional[ConversationSchema]` | ConversationSchema object for conversation formatting | +| `llm_base_url` | `Optional[str]` | String representing the base URL for the LLM API | +| `llm_api_key` | `Optional[str]` | String representing the API key for the LLM | +| `tool_call_summary` | `bool` | Boolean indicating whether to summarize tool calls | +| `output_raw_json_from_tool_call` | `bool` | Boolean indicating whether to output raw JSON from tool calls | +| `summarize_multiple_images` | `bool` | Boolean indicating whether to summarize multiple image outputs | +| `tool_retry_attempts` | `int` | Integer representing the number of retry attempts for tool execution | +| `reasoning_prompt_on` | `bool` | Boolean indicating whether to enable reasoning prompts | +| `dynamic_context_window` | `bool` | Boolean indicating whether to dynamically adjust context window | +| `show_tool_execution_output` | `bool` | Boolean indicating whether to show tool execution output | +| `created_at` | `float` | Float representing the timestamp when the agent was created | +| `workspace_dir` | `str` | String representing the workspace directory for the agent | +| `timeout` | `Optional[int]` | Integer representing the timeout for operations in seconds | +| `temperature` | `float` | Float representing the temperature for the LLM | +| `max_tokens` | `int` | Integer representing the maximum number of tokens | +| `frequency_penalty` | `float` | Float representing the frequency penalty | +| `presence_penalty` | `float` | Float representing the presence penalty | +| `tool_system_prompt` | `str` | String representing the system prompt for tools | +| `log_directory` | `str` | String representing the directory for logs | ## `Agent` Methods | Method | Description | Inputs | Usage Example | |--------|-------------|--------|----------------| -| `run(task, img=None, imgs=None, correct_answer=None, streaming_callback=None, *args, **kwargs)` | Runs the autonomous agent loop to complete the given task. | `task` (str): The task to be performed.
`img` (str, optional): Path to an image file.
`imgs` (List[str], optional): List of image paths.
`correct_answer` (str, optional): Expected correct answer for validation.
`streaming_callback` (Callable, optional): Callback for streaming tokens.
`*args`, `**kwargs`: Additional arguments. | `response = agent.run("Generate a report on financial performance.")` | +| `run(task, img=None, imgs=None, correct_answer=None, streaming_callback=None, *args, **kwargs)` | Runs the autonomous agent loop to complete the given task with enhanced parameters. | `task` (str): The task to be performed.
`img` (str, optional): Path to a single image file.
`imgs` (List[str], optional): List of image paths for batch processing.
`correct_answer` (str, optional): Expected correct answer for validation with automatic retries.
`streaming_callback` (Callable, optional): Callback function for real-time token streaming.
`*args`, `**kwargs`: Additional arguments. | `response = agent.run("Generate a report on financial performance.")` | | `run_batched(tasks, imgs=None, *args, **kwargs)` | Runs multiple tasks concurrently in batch mode. | `tasks` (List[str]): List of tasks to run.
`imgs` (List[str], optional): List of images to process.
`*args`, `**kwargs`: Additional arguments. | `responses = agent.run_batched(["Task 1", "Task 2"])` | +| `run_multiple_images(task, imgs, *args, **kwargs)` | Runs the agent with multiple images using concurrent processing. | `task` (str): The task to perform on each image.
`imgs` (List[str]): List of image paths or URLs.
`*args`, `**kwargs`: Additional arguments. | `outputs = agent.run_multiple_images("Describe image", ["img1.jpg", "img2.png"])` | +| `continuous_run_with_answer(task, img=None, correct_answer=None, max_attempts=10)` | Runs the agent until the correct answer is provided. | `task` (str): The task to perform.
`img` (str, optional): Image to process.
`correct_answer` (str): Expected answer.
`max_attempts` (int): Maximum attempts. | `response = agent.continuous_run_with_answer("Math problem", correct_answer="42")` | +| `tool_execution_retry(response, loop_count)` | Executes tools with retry logic for handling failures. | `response` (any): Response containing tool calls.
`loop_count` (int): Current loop number. | `agent.tool_execution_retry(response, 1)` | | `__call__(task, img=None, *args, **kwargs)` | Alternative way to call the `run` method. | Same as `run`. | `response = agent("Generate a report on financial performance.")` | | `parse_and_execute_tools(response, *args, **kwargs)` | Parses the agent's response and executes any tools mentioned in it. | `response` (str): The agent's response to be parsed.
`*args`, `**kwargs`: Additional arguments. | `agent.parse_and_execute_tools(response)` | | `add_memory(message)` | Adds a message to the agent's memory. | `message` (str): The message to add. | `agent.add_memory("Important information")` | @@ -249,70 +260,118 @@ graph TD -## Updated Run Method +## `Agent.run(*args, **kwargs)` -The run method has been updated with new parameters for enhanced functionality: +The `run` method has been significantly enhanced with new parameters for advanced functionality: -| Method | Description | Inputs | Usage Example | -|--------|-------------|--------|----------------| -| `run(task, img=None, imgs=None, correct_answer=None, streaming_callback=None, *args, **kwargs)` | Runs the agent with enhanced parameters | `task` (str): Task to run
`img` (str, optional): Single image path
`imgs` (List[str], optional): List of image paths
`correct_answer` (str, optional): Expected answer for validation
`streaming_callback` (Callable, optional): Callback for streaming tokens
`*args`, `**kwargs`: Additional arguments | `agent.run("Analyze data", imgs=["img1.jpg", "img2.png"])` | +### Method Signature +```python +def run( + self, + task: Optional[Union[str, Any]] = None, + img: Optional[str] = None, + imgs: Optional[List[str]] = None, + correct_answer: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, + *args, + **kwargs, +) -> Any: +``` +### Parameters +| Parameter | Type | Description | Default | +|-----------|------|-------------|---------| +| `task` | `Optional[Union[str, Any]]` | The task to be executed | `None` | +| `img` | `Optional[str]` | Path to a single image file | `None` | +| `imgs` | `Optional[List[str]]` | List of image paths for batch processing | `None` | +| `correct_answer` | `Optional[str]` | Expected correct answer for validation with automatic retries | `None` | +| `streaming_callback` | `Optional[Callable[[str], None]]` | Callback function to receive streaming tokens in real-time | `None` | +| `*args` | `Any` | Additional positional arguments | - | +| `**kwargs` | `Any` | Additional keyword arguments | - | -## Getting Started +### Examples -To use the Swarm Agent, first install the required dependencies: -```bash -pip3 install -U swarms -``` +```python +# --- Enhanced Run Method Examples --- -Then, you can initialize and use the agent as follows: +# Basic Usage +# Simple task execution +response = agent.run("Generate a report on financial performance.") -```python -from swarms.structs.agent import Agent -from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT +# Single Image Processing +# Process a single image +response = agent.run( + task="Analyze this image and describe what you see", + img="path/to/image.jpg" +) -# Initialize the Financial Analysis Agent with GPT-4o-mini model -agent = Agent( - agent_name="Financial-Analysis-Agent", - system_prompt=FINANCIAL_AGENT_SYS_PROMPT, - model_name="gpt-4o-mini", - max_loops=1, - autosave=True, - dashboard=False, - verbose=True, - dynamic_temperature_enabled=True, - saved_state_path="finance_agent.json", - user_name="swarms_corp", - retry_attempts=1, - context_length=200000, - return_step_meta=False, - output_type="str", +# Multiple Image Processing +# Process multiple images concurrently +response = agent.run( + task="Analyze these images and identify common patterns", + imgs=["image1.jpg", "image2.png", "image3.jpeg"] ) -# Run the agent +# Answer Validation with Retries +# Run until correct answer is found response = agent.run( - "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?" + task="What is the capital of France?", + correct_answer="Paris" +) + +# Real-time Streaming +def streaming_callback(token: str): + print(token, end="", flush=True) + +response = agent.run( + task="Tell me a long story about space exploration", + streaming_callback=streaming_callback ) -print(response) +# Combined Parameters +# Complex task with multiple features +response = agent.run( + task="Analyze these financial charts and provide insights", + imgs=["chart1.png", "chart2.png", "chart3.png"], + correct_answer="market volatility", + streaming_callback=my_callback +) ``` -## Advanced Usage +### Return Types + +The `run` method returns different types based on the input parameters: + +| Scenario | Return Type | Description | +|-----------------------|-----------------------------------------------|---------------------------------------------------------| +| Single task | `str` | Returns the agent's response | +| Multiple images | `List[Any]` | Returns a list of results, one for each image | +| Answer validation | `str` | Returns the correct answer as a string | +| Streaming | `str` | Returns the complete response after streaming completes | + + + +## Advanced Capabilities ### Tool Integration -To integrate tools with the Swarm `Agent`, you can pass a list of callable functions with types and doc strings to the `tools` parameter when initializing the `Agent` instance. The agent will automatically convert these functions into an OpenAI function calling schema and make them available for use during task execution. +The `Agent` class allows seamless integration of external tools by accepting a list of Python functions via the `tools` parameter during initialization. Each tool function must include type annotations and a docstring. The `Agent` class automatically converts these functions into an OpenAI-compatible function calling schema, making them accessible for use during task execution. + +Learn more about tools [here](https://docs.swarms.world/en/latest/swarms/tools/tools_examples/) ## Requirements for a tool -- Function - - With types - - with doc strings + +| Requirement | Description | +|---------------------|------------------------------------------------------------------| +| Function | The tool must be a Python function. | +| With types | The function must have type annotations for its parameters. | +| With doc strings | The function must include a docstring describing its behavior. | +| Must return a string| The function must return a string value. | ```python from swarms import Agent -from swarm_models import OpenAIChat import subprocess def terminal(code: str): @@ -331,7 +390,7 @@ def terminal(code: str): # Initialize the agent with a tool agent = Agent( agent_name="Terminal-Agent", - llm=OpenAIChat(api_key=os.getenv("OPENAI_API_KEY")), + model_name="claude-sonnet-4-20250514", tools=[terminal], system_prompt="You are an agent that can execute terminal commands. Use the tools provided to assist the user.", ) @@ -347,7 +406,6 @@ The Swarm Agent supports integration with vector databases for long-term memory ```python from swarms import Agent -from swarm_models import Anthropic from swarms_memory import ChromaDB # Initialize ChromaDB @@ -359,7 +417,7 @@ chromadb = ChromaDB( # Initialize the agent with long-term memory agent = Agent( agent_name="Financial-Analysis-Agent", - llm=Anthropic(anthropic_api_key=os.getenv("ANTHROPIC_API_KEY")), + model_name="claude-sonnet-4-20250514", long_term_memory=chromadb, system_prompt="You are a financial analysis agent with access to long-term memory.", ) @@ -376,7 +434,7 @@ To enable interactive mode, set the `interactive` parameter to `True` when initi ```python agent = Agent( agent_name="Interactive-Agent", - llm=OpenAIChat(api_key=os.getenv("OPENAI_API_KEY")), + model_name="claude-sonnet-4-20250514", interactive=True, system_prompt="You are an interactive agent. Engage in a conversation with the user.", ) @@ -385,31 +443,6 @@ agent = Agent( agent.run("Let's start a conversation") ``` -### Sentiment Analysis - -To perform sentiment analysis on the agent's outputs, you can provide a sentiment analyzer function: - -```python -from textblob import TextBlob - -def sentiment_analyzer(text): - analysis = TextBlob(text) - return analysis.sentiment.polarity - -agent = Agent( - agent_name="Sentiment-Analysis-Agent", - llm=OpenAIChat(api_key=os.getenv("OPENAI_API_KEY")), - sentiment_analyzer=sentiment_analyzer, - sentiment_threshold=0.5, - system_prompt="You are an agent that generates responses with sentiment analysis.", -) - -response = agent.run("Generate a positive statement about AI") -print(response) -``` - - - ### Undo Functionality ```python @@ -468,6 +501,29 @@ print(f"Completed {len(batch_responses)} tasks in batch mode") The new `run_batched` method allows you to process multiple tasks efficiently: +#### Method Signature + +```python +def run_batched( + self, + tasks: List[str], + imgs: List[str] = None, + *args, + **kwargs, +) -> List[Any]: +``` + +#### Parameters + +| Parameter | Type | Description | Default | +|-----------|------|-------------|---------| +| `tasks` | `List[str]` | List of tasks to run concurrently | Required | +| `imgs` | `List[str]` | List of images to process with each task | `None` | +| `*args` | `Any` | Additional positional arguments | - | +| `**kwargs` | `Any` | Additional keyword arguments | - | + +#### Usage Examples + ```python # Process multiple tasks in batch tasks = [ @@ -485,6 +541,27 @@ for i, (task, result) in enumerate(zip(tasks, batch_results)): print(f"Result: {result}\n") ``` +#### Batch Processing with Images + +```python +# Process multiple tasks with multiple images +tasks = [ + "Analyze this chart for trends", + "Identify patterns in this data visualization", + "Summarize the key insights from this graph" +] + +images = ["chart1.png", "chart2.png", "chart3.png"] + +# Each task will process all images +batch_results = agent.run_batched(tasks, imgs=images) +``` + +#### Return Type + +- **Returns**: `List[Any]` - List of results from each task execution +- **Order**: Results are returned in the same order as the input tasks + ### Various other settings ```python @@ -540,35 +617,27 @@ agent.model_dump_json() print(agent.to_toml()) ``` -## Auto Generate Prompt + CPU Execution + +## Examples + +### Auto Generate Prompt + CPU Execution ```python import os from swarms import Agent -from swarm_models import OpenAIChat from dotenv import load_dotenv # Load environment variables load_dotenv() -# Retrieve the OpenAI API key from the environment variable -api_key = os.getenv("GROQ_API_KEY") - -# Initialize the model for OpenAI Chat -model = OpenAIChat( - openai_api_base="https://api.groq.com/openai/v1", - openai_api_key=api_key, - model_name="llama-3.1-70b-versatile", - temperature=0.1, -) - # Initialize the agent with automated prompt engineering enabled agent = Agent( agent_name="Financial-Analysis-Agent", system_prompt=None, # System prompt is dynamically generated + model_name="gpt-4.1", agent_description=None, llm=model, max_loops=1, @@ -587,22 +656,23 @@ agent = Agent( # Run the agent with a task description and specify the device agent.run( "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria", - ## Will design a system prompt based on the task if description and system prompt are None - device="cpu", ) # Print the dynamically generated system prompt print(agent.system_prompt) - ``` ## Agent Structured Outputs - Create a structured output schema for the agent [List[Dict]] + - Input in the `tools_list_dictionary` parameter + - Output is a dictionary + - Use the `str_to_dict` function to convert the output to a dictionary + ```python from dotenv import load_dotenv @@ -673,6 +743,170 @@ print(type(str_to_dict(out))) ``` +## Comprehensive Agent Configuration Examples + +### Advanced Agent with All New Features + +```python +from swarms import Agent +from swarms_memory import ChromaDB +from datetime import datetime, timedelta + +# Initialize advanced agent with comprehensive configuration +agent = Agent( + # Basic Configuration + agent_name="Advanced-Analysis-Agent", + agent_description="Multi-modal analysis agent with advanced capabilities", + system_prompt="You are an advanced analysis agent capable of processing multiple data types.", + + # Enhanced Run Parameters + max_loops=3, + dynamic_loops=True, + interactive=False, + dashboard=True, + + # Device and Resource Management + device="gpu", + device_id=0, + all_cores=True, + all_gpus=False, + do_not_use_cluster_ops=True, + + # Memory and Context Management + context_length=100000, + memory_chunk_size=3000, + dynamic_context_window=True, + rag_every_loop=True, + + # Advanced Features + auto_generate_prompt=True, + plan_enabled=True, + react_on=True, + safety_prompt_on=True, + reasoning_prompt_on=True, + + # Tool Management + tool_retry_attempts=5, + tool_call_summary=True, + show_tool_execution_output=True, + function_calling_format_type="OpenAI", + + # Artifacts and Output + artifacts_on=True, + artifacts_output_path="./outputs", + artifacts_file_extension=".md", + output_type="json", + + # LLM Configuration + model_name="gpt-4o", + temperature=0.3, + max_tokens=8000, + top_p=0.95, + frequency_penalty=0.1, + presence_penalty=0.1, + + # MCP Integration + mcp_url="http://localhost:8000", + mcp_config=None, + + # Performance Settings + timeout=300, + retry_attempts=3, + retry_interval=2, + + # Scheduling + scheduled_run_date=datetime.now() + timedelta(hours=1), + + # Metadata and Organization + tags=["analysis", "multi-modal", "advanced"], + use_cases=[{"name": "Data Analysis", "description": "Process and analyze complex datasets"}], + + # Verbosity and Logging + verbose=True, + print_on=True, + print_every_step=True, + log_directory="./logs" +) + +# Run with multiple images and streaming +def streaming_callback(token: str): + print(token, end="", flush=True) + +response = agent.run( + task="Analyze these financial charts and provide comprehensive insights", + imgs=["chart1.png", "chart2.png", "chart3.png"], + streaming_callback=streaming_callback +) + +# Run batch processing +tasks = [ + "Analyze Q1 financial performance", + "Generate Q2 projections", + "Create executive summary" +] + +batch_results = agent.run_batched(tasks) + +# Run with answer validation +validated_response = agent.run( + task="What is the current market trend?", + correct_answer="bullish", + max_attempts=5 +) +``` + +### MCP-Enabled Agent Example + +```python +from swarms import Agent +from swarms.schemas.mcp_schemas import MCPConnection + +# Configure MCP connection +mcp_config = MCPConnection( + server_path="http://localhost:8000", + server_name="my_mcp_server", + capabilities=["tools", "filesystem"] +) + +# Initialize agent with MCP integration +mcp_agent = Agent( + agent_name="MCP-Enabled-Agent", + system_prompt="You are an agent with access to external tools via MCP.", + mcp_config=mcp_config, + mcp_urls=["http://localhost:8000", "http://localhost:8001"], + tool_call_summary=True, + output_raw_json_from_tool_call=True +) + +# Run with MCP tools +response = mcp_agent.run("Use the available tools to analyze the current system status") +``` + +### Multi-Image Processing Agent + +```python +# Initialize agent optimized for image processing +image_agent = Agent( + agent_name="Image-Analysis-Agent", + system_prompt="You are an expert at analyzing images and extracting insights.", + multi_modal=True, + summarize_multiple_images=True, + artifacts_on=True, + artifacts_output_path="./image_analysis", + artifacts_file_extension=".txt" +) + +# Process multiple images with summarization +images = ["product1.jpg", "product2.jpg", "product3.jpg"] +analysis = image_agent.run( + task="Analyze these product images and identify design patterns", + imgs=images +) + +# The agent will automatically summarize results if summarize_multiple_images=True +print(f"Analysis complete: {len(analysis)} images processed") +``` + ## New Features and Parameters ### Enhanced Run Method Parameters @@ -685,43 +919,100 @@ The `run` method now supports several new parameters for advanced functionality: ### MCP (Model Context Protocol) Integration -New parameters enable seamless MCP server integration: - -- **`mcp_url`**: Connect to a single MCP server -- **`mcp_urls`**: Connect to multiple MCP servers -- **`mcp_config`**: Advanced MCP configuration options +| Parameter | Description | +|----------------|-----------------------------------------------------| +| `mcp_url` | Connect to a single MCP server | +| `mcp_urls` | Connect to multiple MCP servers | +| `mcp_config` | Advanced MCP configuration options | ### Advanced Reasoning and Safety -- **`react_on`**: Enable ReAct reasoning for complex problem-solving -- **`safety_prompt_on`**: Add safety constraints to agent responses -- **`reasoning_prompt_on`**: Enable multi-loop reasoning for complex tasks +| Parameter | Description | +|----------------------|--------------------------------------------------------------------| +| `react_on` | Enable ReAct reasoning for complex problem-solving | +| `safety_prompt_on` | Add safety constraints to agent responses | +| `reasoning_prompt_on`| Enable multi-loop reasoning for complex tasks | ### Performance and Resource Management -- **`dynamic_context_window`**: Automatically adjust context window based on available tokens -- **`tool_retry_attempts`**: Configure retry behavior for tool execution -- **`summarize_multiple_images`**: Automatically summarize results from multiple image processing +| Parameter | Description | +|--------------------------|--------------------------------------------------------------------------| +| `dynamic_context_window` | Automatically adjust context window based on available tokens | +| `tool_retry_attempts` | Configure retry behavior for tool execution | +| `summarize_multiple_images` | Automatically summarize results from multiple image processing | + +### Device and Resource Management + +| Parameter | Description | +|--------------------------|--------------------------------------------------------------------------| +| `device` | Specify CPU or GPU execution (`"cpu"` or `"gpu"`) | +| `device_id` | Specify which GPU device to use | +| `all_cores` | Enable use of all available CPU cores | +| `all_gpus` | Enable use of all available GPUs | +| `do_not_use_cluster_ops` | Control cluster operation usage | + +### Advanced Memory and Context + +| Parameter | Description | +|--------------------------|--------------------------------------------------------------------------| +| `rag_every_loop` | Query RAG database on every loop iteration | +| `memory_chunk_size` | Control memory chunk size for long-term memory | +| `auto_generate_prompt` | Automatically generate system prompts based on tasks | +| `plan_enabled` | Enable planning functionality for complex tasks | + +### Artifacts and Output Management + +| Parameter | Description | +|--------------------------|--------------------------------------------------------------------------| +| `artifacts_on` | Enable saving artifacts from agent execution | +| `artifacts_output_path` | Specify where to save artifacts | +| `artifacts_file_extension` | Control artifact file format | +| `output_raw_json_from_tool_call` | Control tool call output format | + +### Enhanced Tool Management + +| Parameter | Description | +|--------------------------|--------------------------------------------------------------------------| +| `tools_list_dictionary` | Provide tool schemas in dictionary format | +| `tool_call_summary` | Enable automatic summarization of tool calls | +| `show_tool_execution_output` | Control visibility of tool execution details | +| `function_calling_format_type` | Specify function calling format (OpenAI, etc.) | + +### Advanced LLM Configuration + +| Parameter | Description | +|--------------------------|--------------------------------------------------------------------------| +| `llm_args` | Pass additional arguments to the LLM | +| `llm_base_url` | Specify custom LLM API endpoint | +| `llm_api_key` | Provide LLM API key directly | +| `top_p` | Control top-p sampling parameter | +| `frequency_penalty` | Control frequency penalty | +| `presence_penalty` | Control presence penalty | + + + ## Best Practices -1. Always provide a clear and concise `system_prompt` to guide the agent's behavior. -2. Use `tools` to extend the agent's capabilities for specific tasks. -3. Implement error handling and utilize the `retry_attempts` feature for robust execution. -4. Leverage `long_term_memory` for tasks that require persistent information. -5. Use `interactive` mode for real-time conversations and `dashboard` for monitoring. -6. Implement `sentiment_analysis` for applications requiring tone management. -7. Utilize `autosave` and `save`/`load` methods for continuity across sessions. -8. Optimize token usage with `dynamic_context_window` and `tokens_checks` methods. -9. Use `concurrent` and `async` methods for performance-critical applications. -10. Regularly review and analyze feedback using the `analyze_feedback` method. -11. Use `artifacts_on` to save important outputs from agent execution -12. Configure `device` and `device_id` appropriately for optimal performance -13. Enable `rag_every_loop` when continuous context from long-term memory is needed -14. Use `scheduled_run_date` for automated task scheduling -15. Leverage `run_batched` for efficient processing of multiple related tasks -16. Use `mcp_url` or `mcp_urls` to extend agent capabilities with external tools -17. Enable `react_on` for complex reasoning tasks requiring step-by-step analysis -18. Configure `tool_retry_attempts` for robust tool execution in production environments +| Best Practice / Feature | Description | +|---------------------------------------------------------|--------------------------------------------------------------------------------------------------| +| `system_prompt` | Always provide a clear and concise system prompt to guide the agent's behavior. | +| `tools` | Use tools to extend the agent's capabilities for specific tasks. | +| `retry_attempts` & error handling | Implement error handling and utilize the retry_attempts feature for robust execution. | +| `long_term_memory` | Leverage long_term_memory for tasks that require persistent information. | +| `interactive` & `dashboard` | Use interactive mode for real-time conversations and dashboard for monitoring. | +| `sentiment_analysis` | Implement sentiment_analysis for applications requiring tone management. | +| `autosave`, `save`/`load` | Utilize autosave and save/load methods for continuity across sessions. | +| `dynamic_context_window` & `tokens_checks` | Optimize token usage with dynamic_context_window and tokens_checks methods. | +| `concurrent` & `async` methods | Use concurrent and async methods for performance-critical applications. | +| `analyze_feedback` | Regularly review and analyze feedback using the analyze_feedback method. | +| `artifacts_on` | Use artifacts_on to save important outputs from agent execution. | +| `device` & `device_id` | Configure device and device_id appropriately for optimal performance. | +| `rag_every_loop` | Enable rag_every_loop when continuous context from long-term memory is needed. | +| `scheduled_run_date` | Use scheduled_run_date for automated task scheduling. | +| `run_batched` | Leverage run_batched for efficient processing of multiple related tasks. | +| `mcp_url` or `mcp_urls` | Use mcp_url or mcp_urls to extend agent capabilities with external tools. | +| `react_on` | Enable react_on for complex reasoning tasks requiring step-by-step analysis. | +| `tool_retry_attempts` | Configure tool_retry_attempts for robust tool execution in production environments. | By following these guidelines and leveraging the Swarm Agent's extensive features, you can create powerful, flexible, and efficient autonomous agents for a wide range of applications. \ No newline at end of file diff --git a/examples/single_agent/rag/README.md b/examples/single_agent/rag/README.md new file mode 100644 index 00000000..54a5770e --- /dev/null +++ b/examples/single_agent/rag/README.md @@ -0,0 +1,230 @@ +# Qdrant RAG Example with Document Ingestion + +This example demonstrates how to use the agent structure from `example.py` with Qdrant RAG to ingest a vast array of PDF documents and text files for advanced quantitative trading analysis. + +## πŸš€ Features + +- **Document Ingestion**: Process PDF, TXT, and Markdown files automatically +- **Qdrant Vector Database**: High-performance vector storage with similarity search +- **Sentence Transformer Embeddings**: Local embedding generation using state-of-the-art models +- **Intelligent Chunking**: Smart text chunking with overlap for better retrieval +- **Concurrent Processing**: Multi-threaded document processing for large collections +- **RAG Integration**: Seamless integration with Swarms Agent framework +- **Financial Analysis**: Specialized for quantitative trading and financial research + +## πŸ“‹ Prerequisites + +- Python 3.10+ +- Qdrant client (local or cloud) +- Sentence transformers for embeddings +- Swarms framework + +## πŸ› οΈ Installation + +1. **Install dependencies**: + ```bash + pip install -r requirements.txt + ``` + +2. **Set up environment variables** (optional, for cloud deployment): + ```bash + export QDRANT_URL="your_qdrant_url" + export QDRANT_API_KEY="your_qdrant_api_key" + ``` + +## πŸ—οΈ Architecture + +The example consists of three main components: + +### 1. DocumentProcessor +- Handles file discovery and text extraction +- Supports PDF, TXT, and Markdown formats +- Concurrent processing for large document collections +- Error handling and validation + +### 2. QdrantRAGMemory +- Vector database management with Qdrant +- Intelligent text chunking with overlap +- Semantic search capabilities +- Metadata storage and retrieval + +### 3. QuantitativeTradingRAGAgent +- Combines Swarms Agent with RAG capabilities +- Financial analysis specialization +- Document context enhancement +- Query processing and response generation + +## πŸ“– Usage + +### Basic Setup + +```python +from qdrant_rag_example import QuantitativeTradingRAGAgent + +# Initialize the agent +agent = QuantitativeTradingRAGAgent( + agent_name="Financial-Analysis-Agent", + collection_name="financial_documents", + model_name="claude-sonnet-4-20250514" +) +``` + +### Document Ingestion + +```python +# Ingest documents from a directory +documents_path = "./financial_documents" +num_ingested = agent.ingest_documents(documents_path) +print(f"Ingested {num_ingested} documents") +``` + +### Querying Documents + +```python +# Search for relevant information +results = agent.query_documents("gold ETFs investment strategies", limit=5) +for result in results: + print(f"Document: {result['document_name']}") + print(f"Relevance: {result['similarity_score']:.3f}") + print(f"Content: {result['chunk_text'][:200]}...") +``` + +### Running Analysis + +```python +# Run financial analysis with RAG context +task = "What are the best top 3 ETFs for gold coverage?" +response = agent.run_analysis(task) +print(response) +``` + +## πŸ“ Directory Structure + +``` +financial_documents/ +β”œβ”€β”€ research_papers/ +β”‚ β”œβ”€β”€ gold_etf_analysis.pdf +β”‚ β”œβ”€β”€ market_research.pdf +β”‚ └── portfolio_strategies.pdf +β”œβ”€β”€ company_reports/ +β”‚ β”œβ”€β”€ annual_reports.txt +β”‚ └── quarterly_updates.md +└── market_data/ + β”œβ”€β”€ historical_prices.csv + └── volatility_analysis.txt +``` + +## βš™οΈ Configuration Options + +### Agent Configuration +- `agent_name`: Name of the agent +- `collection_name`: Qdrant collection name +- `model_name`: LLM model to use +- `max_loops`: Maximum agent execution loops +- `chunk_size`: Text chunk size (default: 1000) +- `chunk_overlap`: Overlap between chunks (default: 200) + +### Document Processing +- `supported_extensions`: File types to process +- `max_workers`: Concurrent processing threads +- `score_threshold`: Similarity search threshold + +## πŸ” Advanced Features + +### Custom Embedding Models +```python +# Use different sentence transformer models +from sentence_transformers import SentenceTransformer + +custom_model = SentenceTransformer("all-mpnet-base-v2") +# Update the embedding model in QdrantRAGMemory +``` + +### Cloud Deployment +```python +# Connect to Qdrant cloud +agent = QuantitativeTradingRAGAgent( + qdrant_url="https://your-instance.qdrant.io", + qdrant_api_key="your_api_key" +) +``` + +### Batch Processing +```python +# Process multiple directories +directories = ["./docs1", "./docs2", "./docs3"] +for directory in directories: + agent.ingest_documents(directory) +``` + +## πŸ“Š Performance Considerations + +- **Chunk Size**: Larger chunks (1000-2000 chars) for detailed analysis, smaller (500-1000) for precise retrieval +- **Overlap**: 10-20% overlap between chunks for better context continuity +- **Concurrency**: Adjust `max_workers` based on your system capabilities +- **Vector Size**: 768 dimensions for sentence-transformers, 1536 for OpenAI embeddings + +## 🚨 Error Handling + +The system includes comprehensive error handling for: +- File not found errors +- Unsupported file types +- Processing failures +- Network connectivity issues +- Invalid document content + +## πŸ”§ Troubleshooting + +### Common Issues + +1. **Import Errors**: Ensure all dependencies are installed + ```bash + pip install -r requirements.txt + ``` + +2. **Memory Issues**: Reduce chunk size or use cloud Qdrant + ```python + agent = QuantitativeTradingRAGAgent(chunk_size=500) + ``` + +3. **Processing Failures**: Check file permissions and formats + ```python + # Verify supported formats + processor = DocumentProcessor(supported_extensions=['.pdf', '.txt']) + ``` + +### Performance Optimization + +- Use SSD storage for document processing +- Increase `max_workers` for multi-core systems +- Consider cloud Qdrant for large document collections +- Implement document caching for frequently accessed files + +## πŸ“ˆ Use Cases + +- **Financial Research**: Analyze market reports, earnings calls, and research papers +- **Legal Document Review**: Process contracts, regulations, and case law +- **Academic Research**: Index research papers and academic literature +- **Compliance Monitoring**: Track regulatory changes and compliance requirements +- **Risk Assessment**: Analyze risk reports and market analysis + +## 🀝 Contributing + +To extend this example: +1. Add support for additional file formats +2. Implement custom embedding strategies +3. Add document versioning and change tracking +4. Integrate with other vector databases +5. Add document summarization capabilities + +## πŸ“„ License + +This example is part of the Swarms framework and follows the same licensing terms. + +## πŸ†˜ Support + +For issues and questions: +- Check the Swarms documentation +- Review the example code and error messages +- Ensure all dependencies are properly installed +- Verify Qdrant connection and configuration diff --git a/examples/single_agent/rag/qdrant_rag_example.py b/examples/single_agent/rag/qdrant_rag_example.py new file mode 100644 index 00000000..2b88cf4a --- /dev/null +++ b/examples/single_agent/rag/qdrant_rag_example.py @@ -0,0 +1,882 @@ +""" +Qdrant RAG Example with Document Ingestion + +This example demonstrates how to use the agent structure from example.py with Qdrant RAG +to ingest a vast array of PDF documents and text files for advanced quantitative trading analysis. + +Features: +- Document ingestion from multiple file types (PDF, TXT, MD) +- Qdrant vector database integration +- Sentence transformer embeddings +- Comprehensive document processing pipeline +- Agent with RAG capabilities for financial analysis +""" + +import os +import uuid +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, Union +import concurrent.futures +from concurrent.futures import ThreadPoolExecutor + +from qdrant_client import QdrantClient +from qdrant_client.http import models +from qdrant_client.http.models import Distance, VectorParams +from sentence_transformers import SentenceTransformer + +from swarms import Agent +from swarms.utils.pdf_to_text import pdf_to_text +from swarms.utils.data_to_text import data_to_text + + +class DocumentProcessor: + """ + Handles document processing and text extraction from various file formats. + + This class provides functionality to process PDF, TXT, and Markdown files, + extracting text content for vectorization and storage in the RAG system. + """ + + def __init__( + self, supported_extensions: Optional[List[str]] = None + ): + """ + Initialize the DocumentProcessor. + + Args: + supported_extensions: List of supported file extensions. + Defaults to ['.pdf', '.txt', '.md'] + """ + if supported_extensions is None: + supported_extensions = [".pdf", ".txt", ".md"] + + self.supported_extensions = supported_extensions + + def process_document( + self, file_path: Union[str, Path] + ) -> Optional[Dict[str, str]]: + """ + Process a single document and extract its text content. + + Args: + file_path: Path to the document file + + Returns: + Dictionary containing document metadata and extracted text, or None if processing fails + """ + file_path = Path(file_path) + + if not file_path.exists(): + print(f"File not found: {file_path}") + return None + + if file_path.suffix.lower() not in self.supported_extensions: + print(f"Unsupported file type: {file_path.suffix}") + return None + + try: + # Extract text based on file type + if file_path.suffix.lower() == ".pdf": + try: + text_content = pdf_to_text(str(file_path)) + except Exception as pdf_error: + print(f"Error extracting PDF text: {pdf_error}") + # Fallback: try to read as text file + with open( + file_path, + "r", + encoding="utf-8", + errors="ignore", + ) as f: + text_content = f.read() + else: + try: + text_content = data_to_text(str(file_path)) + except Exception as data_error: + print(f"Error extracting text: {data_error}") + # Fallback: try to read as text file + with open( + file_path, + "r", + encoding="utf-8", + errors="ignore", + ) as f: + text_content = f.read() + + # Ensure text_content is a string + if callable(text_content): + print( + f"Warning: {file_path} returned a callable, trying to call it..." + ) + try: + text_content = text_content() + except Exception as call_error: + print(f"Error calling callable: {call_error}") + return None + + if not text_content or not isinstance(text_content, str): + print( + f"No valid text content extracted from: {file_path}" + ) + return None + + # Clean the text content + text_content = str(text_content).strip() + + return { + "file_path": str(file_path), + "file_name": file_path.name, + "file_type": file_path.suffix.lower(), + "text_content": text_content, + "file_size": file_path.stat().st_size, + "processed_at": datetime.utcnow().isoformat(), + } + + except Exception as e: + print(f"Error processing {file_path}: {str(e)}") + return None + + def process_directory( + self, directory_path: Union[str, Path], max_workers: int = 4 + ) -> List[Dict[str, str]]: + """ + Process all supported documents in a directory concurrently. + + Args: + directory_path: Path to the directory containing documents + max_workers: Maximum number of concurrent workers for processing + + Returns: + List of processed document dictionaries + """ + directory_path = Path(directory_path) + + if not directory_path.is_dir(): + print(f"Directory not found: {directory_path}") + return [] + + # Find all supported files + supported_files = [] + for ext in self.supported_extensions: + supported_files.extend(directory_path.rglob(f"*{ext}")) + supported_files.extend( + directory_path.rglob(f"*{ext.upper()}") + ) + + if not supported_files: + print(f"No supported files found in: {directory_path}") + return [] + + print(f"Found {len(supported_files)} files to process") + + # Process files concurrently + processed_documents = [] + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_file = { + executor.submit( + self.process_document, file_path + ): file_path + for file_path in supported_files + } + + for future in concurrent.futures.as_completed( + future_to_file + ): + file_path = future_to_file[future] + try: + result = future.result() + if result: + processed_documents.append(result) + print(f"Processed: {result['file_name']}") + except Exception as e: + print(f"Error processing {file_path}: {str(e)}") + + print( + f"Successfully processed {len(processed_documents)} documents" + ) + return processed_documents + + +class QdrantRAGMemory: + """ + Enhanced Qdrant memory system for RAG operations with document storage. + + This class extends the basic Qdrant memory system to handle document ingestion, + chunking, and semantic search for large document collections. + """ + + def __init__( + self, + collection_name: str = "document_memories", + vector_size: int = 384, # Default size for all-MiniLM-L6-v2 + url: Optional[str] = None, + api_key: Optional[str] = None, + chunk_size: int = 1000, + chunk_overlap: int = 200, + ): + """ + Initialize the Qdrant RAG memory system. + + Args: + collection_name: Name of the Qdrant collection to use + vector_size: Dimension of the embedding vectors + url: Optional Qdrant server URL (defaults to local) + api_key: Optional Qdrant API key for cloud deployment + chunk_size: Size of text chunks for processing + chunk_overlap: Overlap between consecutive chunks + """ + self.collection_name = collection_name + self.vector_size = vector_size + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + + # Initialize Qdrant client + if url and api_key: + self.client = QdrantClient(url=url, api_key=api_key) + else: + self.client = QdrantClient( + ":memory:" + ) # Local in-memory storage + + # Initialize embedding model + self.embedding_model = SentenceTransformer("all-MiniLM-L6-v2") + + # Get the actual embedding dimension from the model + sample_text = "Sample text for dimension check" + sample_embedding = self.embedding_model.encode(sample_text) + actual_dimension = len(sample_embedding) + + # Update vector_size to match the actual model dimension + if actual_dimension != self.vector_size: + print( + f"Updating vector size from {self.vector_size} to {actual_dimension} to match model" + ) + self.vector_size = actual_dimension + + # Create collection if it doesn't exist + self._create_collection() + + def _create_collection(self): + """Create the Qdrant collection if it doesn't exist.""" + collections = self.client.get_collections().collections + exists = any( + col.name == self.collection_name for col in collections + ) + + if not exists: + self.client.create_collection( + collection_name=self.collection_name, + vectors_config=VectorParams( + size=self.vector_size, distance=Distance.COSINE + ), + ) + print( + f"Created Qdrant collection: {self.collection_name}" + ) + + def _chunk_text(self, text: str) -> List[str]: + """ + Split text into overlapping chunks for better retrieval. + + Args: + text: Text content to chunk + + Returns: + List of text chunks + """ + # Ensure text is a string + if not isinstance(text, str): + text = str(text) + + if len(text) <= self.chunk_size: + return [text] + + chunks = [] + start = 0 + + while start < len(text): + end = start + self.chunk_size + + # Try to break at sentence boundaries + if end < len(text): + # Look for sentence endings + for i in range(end, max(start, end - 100), -1): + if text[i] in ".!?": + end = i + 1 + break + + chunk = text[start:end].strip() + if chunk: + chunks.append(chunk) + + start = end - self.chunk_overlap + if start >= len(text): + break + + return chunks + + def add_document( + self, document_data: Dict[str, str] + ) -> List[str]: + """ + Add a document to the memory system with chunking. + + Args: + document_data: Dictionary containing document information + + Returns: + List of memory IDs for the stored chunks + """ + text_content = document_data["text_content"] + + # Ensure text_content is a string + if not isinstance(text_content, str): + print( + f"Warning: text_content is not a string: {type(text_content)}" + ) + text_content = str(text_content) + + chunks = self._chunk_text(text_content) + + memory_ids = [] + + for i, chunk in enumerate(chunks): + # Generate embedding for the chunk + embedding = self.embedding_model.encode(chunk).tolist() + + # Prepare metadata + metadata = { + "document_name": document_data["file_name"], + "document_path": document_data["file_path"], + "document_type": document_data["file_type"], + "chunk_index": i, + "total_chunks": len(chunks), + "chunk_text": chunk, + "timestamp": datetime.utcnow().isoformat(), + "file_size": document_data["file_size"], + } + + # Store the chunk + memory_id = str(uuid.uuid4()) + self.client.upsert( + collection_name=self.collection_name, + points=[ + models.PointStruct( + id=memory_id, + payload=metadata, + vector=embedding, + ) + ], + ) + + memory_ids.append(memory_id) + + print( + f"Added document '{document_data['file_name']}' with {len(chunks)} chunks" + ) + return memory_ids + + def add_documents_batch( + self, documents: List[Dict[str, str]] + ) -> List[str]: + """ + Add multiple documents to the memory system. + + Args: + documents: List of document dictionaries + + Returns: + List of all memory IDs + """ + all_memory_ids = [] + + for document in documents: + memory_ids = self.add_document(document) + all_memory_ids.extend(memory_ids) + + return all_memory_ids + + def add(self, text: str, metadata: Optional[Dict] = None) -> str: + """ + Add a text entry to the memory system (required by Swarms interface). + + Args: + text: The text content to add + metadata: Optional metadata for the entry + + Returns: + str: ID of the stored memory + """ + if metadata is None: + metadata = {} + + # Generate embedding for the text + embedding = self.embedding_model.encode(text).tolist() + + # Prepare metadata + memory_metadata = { + "text": text, + "timestamp": datetime.utcnow().isoformat(), + "source": "agent_memory", + } + memory_metadata.update(metadata) + + # Store the point + memory_id = str(uuid.uuid4()) + self.client.upsert( + collection_name=self.collection_name, + points=[ + models.PointStruct( + id=memory_id, + payload=memory_metadata, + vector=embedding, + ) + ], + ) + + return memory_id + + def query( + self, + query_text: str, + limit: int = 5, + score_threshold: float = 0.7, + include_metadata: bool = True, + ) -> List[Dict]: + """ + Query memories based on text similarity. + + Args: + query_text: The text query to search for + limit: Maximum number of results to return + score_threshold: Minimum similarity score threshold + include_metadata: Whether to include metadata in results + + Returns: + List of matching memories with their metadata + """ + try: + # Check if collection has any points + collection_info = self.client.get_collection( + self.collection_name + ) + if collection_info.points_count == 0: + print( + "Warning: Collection is empty, no documents to query" + ) + return [] + + # Generate embedding for the query + query_embedding = self.embedding_model.encode( + query_text + ).tolist() + + # Search in Qdrant + results = self.client.search( + collection_name=self.collection_name, + query_vector=query_embedding, + limit=limit, + score_threshold=score_threshold, + ) + + memories = [] + for res in results: + memory = res.payload.copy() + memory["similarity_score"] = res.score + + if not include_metadata: + # Keep only essential information + memory = { + "chunk_text": memory.get("chunk_text", ""), + "document_name": memory.get( + "document_name", "" + ), + "similarity_score": memory[ + "similarity_score" + ], + } + + memories.append(memory) + + return memories + + except Exception as e: + print(f"Error querying collection: {e}") + return [] + + def get_collection_stats(self) -> Dict: + """ + Get statistics about the collection. + + Returns: + Dictionary containing collection statistics + """ + try: + collection_info = self.client.get_collection( + self.collection_name + ) + return { + "collection_name": self.collection_name, + "vector_size": collection_info.config.params.vectors.size, + "distance": collection_info.config.params.vectors.distance, + "points_count": collection_info.points_count, + } + except Exception as e: + print(f"Error getting collection stats: {e}") + return {} + + def clear_collection(self): + """Clear all memories from the collection.""" + self.client.delete_collection(self.collection_name) + self._create_collection() + print(f"Cleared collection: {self.collection_name}") + + +class QuantitativeTradingRAGAgent: + """ + Advanced quantitative trading agent with RAG capabilities for document analysis. + + This agent combines the structure from example.py with Qdrant RAG to provide + comprehensive financial analysis based on ingested documents. + """ + + def __init__( + self, + agent_name: str = "Quantitative-Trading-RAG-Agent", + collection_name: str = "financial_documents", + qdrant_url: Optional[str] = None, + qdrant_api_key: Optional[str] = None, + model_name: str = "claude-sonnet-4-20250514", + max_loops: int = 1, + chunk_size: int = 1000, + chunk_overlap: int = 200, + ): + """ + Initialize the Quantitative Trading RAG Agent. + + Args: + agent_name: Name of the agent + collection_name: Name of the Qdrant collection + qdrant_url: Optional Qdrant server URL + qdrant_api_key: Optional Qdrant API key + model_name: LLM model to use + max_loops: Maximum number of agent loops + chunk_size: Size of text chunks for processing + chunk_overlap: Overlap between consecutive chunks + """ + self.agent_name = agent_name + self.collection_name = collection_name + + # Initialize document processor + self.document_processor = DocumentProcessor() + + # Initialize Qdrant RAG memory + self.rag_memory = QdrantRAGMemory( + collection_name=collection_name, + url=qdrant_url, + api_key=qdrant_api_key, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + ) + + # Initialize the agent with RAG capabilities + self.agent = Agent( + agent_name=agent_name, + agent_description="Advanced quantitative trading and algorithmic analysis agent with RAG capabilities", + system_prompt="""You are an expert quantitative trading agent with deep expertise in: + - Algorithmic trading strategies and implementation + - Statistical arbitrage and market making + - Risk management and portfolio optimization + - High-frequency trading systems + - Market microstructure analysis + - Quantitative research methodologies + - Financial mathematics and stochastic processes + - Machine learning applications in trading + + Your core responsibilities include: + 1. Developing and backtesting trading strategies + 2. Analyzing market data and identifying alpha opportunities + 3. Implementing risk management frameworks + 4. Optimizing portfolio allocations + 5. Conducting quantitative research + 6. Monitoring market microstructure + 7. Evaluating trading system performance + + You have access to a comprehensive document database through RAG (Retrieval-Augmented Generation). + When answering questions, you can search through this database to find relevant information + and provide evidence-based responses. + + You maintain strict adherence to: + - Mathematical rigor in all analyses + - Statistical significance in strategy development + - Risk-adjusted return optimization + - Market impact minimization + - Regulatory compliance + - Transaction cost analysis + - Performance attribution + + You communicate in precise, technical terms while maintaining clarity for stakeholders.""", + model_name=model_name, + dynamic_temperature_enabled=True, + output_type="str-all-except-first", + max_loops=max_loops, + dynamic_context_window=True, + long_term_memory=self.rag_memory, + ) + + def ingest_documents( + self, documents_path: Union[str, Path] + ) -> int: + """ + Ingest documents from a directory into the RAG system. + + Args: + documents_path: Path to directory containing documents + + Returns: + Number of documents successfully ingested + """ + print(f"Starting document ingestion from: {documents_path}") + + try: + # Process documents + processed_documents = ( + self.document_processor.process_directory( + documents_path + ) + ) + + if not processed_documents: + print("No documents to ingest") + return 0 + + # Add documents to RAG memory + memory_ids = self.rag_memory.add_documents_batch( + processed_documents + ) + + print( + f"Successfully ingested {len(processed_documents)} documents" + ) + print(f"Created {len(memory_ids)} memory chunks") + + return len(processed_documents) + + except Exception as e: + print(f"Error during document ingestion: {e}") + import traceback + + traceback.print_exc() + return 0 + + def query_documents( + self, query: str, limit: int = 5 + ) -> List[Dict]: + """ + Query the document database for relevant information. + + Args: + query: The query text + limit: Maximum number of results to return + + Returns: + List of relevant document chunks + """ + return self.rag_memory.query(query, limit=limit) + + def run_analysis(self, task: str) -> str: + """ + Run a financial analysis task using the agent with RAG capabilities. + + Args: + task: The analysis task to perform + + Returns: + Agent's response to the task + """ + print(f"Running analysis task: {task}") + + # First, query the document database for relevant context + relevant_docs = self.query_documents(task, limit=3) + + if relevant_docs: + # Enhance the task with relevant document context + context = "\n\nRelevant Document Information:\n" + for i, doc in enumerate(relevant_docs, 1): + context += f"\nDocument {i}: {doc.get('document_name', 'Unknown')}\n" + context += f"Relevance Score: {doc.get('similarity_score', 0):.3f}\n" + context += ( + f"Content: {doc.get('chunk_text', '')[:500]}...\n" + ) + + enhanced_task = f"{task}\n\n{context}" + else: + enhanced_task = task + + # Run the agent + response = self.agent.run(enhanced_task) + return response + + def get_database_stats(self) -> Dict: + """ + Get statistics about the document database. + + Returns: + Dictionary containing database statistics + """ + return self.rag_memory.get_collection_stats() + + +def main(): + """ + Main function demonstrating the Qdrant RAG agent with document ingestion. + """ + from datetime import datetime + + # Example usage + print("πŸš€ Initializing Quantitative Trading RAG Agent...") + + # Initialize the agent (you can set environment variables for Qdrant cloud) + agent = QuantitativeTradingRAGAgent( + agent_name="Quantitative-Trading-RAG-Agent", + collection_name="financial_documents", + qdrant_url=os.getenv( + "QDRANT_URL" + ), # Optional: For cloud deployment + qdrant_api_key=os.getenv( + "QDRANT_API_KEY" + ), # Optional: For cloud deployment + model_name="claude-sonnet-4-20250514", + max_loops=1, + chunk_size=1000, + chunk_overlap=200, + ) + + # Example: Ingest documents from a directory + documents_path = "documents" # Path to your documents + if os.path.exists(documents_path): + print(f"Found documents directory: {documents_path}") + try: + agent.ingest_documents(documents_path) + except Exception as e: + print(f"Error ingesting documents: {e}") + print("Continuing without document ingestion...") + else: + print(f"Documents directory not found: {documents_path}") + print("Creating a sample document for demonstration...") + + # Create a sample document + try: + sample_doc = { + "file_path": "sample_financial_analysis.txt", + "file_name": "sample_financial_analysis.txt", + "file_type": ".txt", + "text_content": """ + Gold ETFs: A Comprehensive Investment Guide + + Gold ETFs (Exchange-Traded Funds) provide investors with exposure to gold prices + without the need to physically store the precious metal. These funds track the + price of gold and offer several advantages including liquidity, diversification, + and ease of trading. + + Top Gold ETFs include: + 1. SPDR Gold Shares (GLD) - Largest gold ETF with high liquidity + 2. iShares Gold Trust (IAU) - Lower expense ratio alternative + 3. Aberdeen Standard Physical Gold ETF (SGOL) - Swiss storage option + + Investment strategies for gold ETFs: + - Portfolio diversification (5-10% allocation) + - Inflation hedge + - Safe haven during market volatility + - Tactical trading opportunities + + Market analysis shows that gold has historically served as a store of value + and hedge against inflation. Recent market conditions have increased interest + in gold investments due to economic uncertainty and geopolitical tensions. + """, + "file_size": 1024, + "processed_at": datetime.utcnow().isoformat(), + } + + # Add the sample document to the RAG memory + memory_ids = agent.rag_memory.add_document(sample_doc) + print( + f"Added sample document with {len(memory_ids)} chunks" + ) + + except Exception as e: + print(f"Error creating sample document: {e}") + print("Continuing without sample document...") + + # Example: Query the database + print("\nπŸ“Š Querying document database...") + try: + query_results = agent.query_documents( + "gold ETFs investment strategies", limit=3 + ) + print(f"Found {len(query_results)} relevant document chunks") + + if query_results: + print("Sample results:") + for i, result in enumerate(query_results[:2], 1): + print( + f" {i}. {result.get('document_name', 'Unknown')} (Score: {result.get('similarity_score', 0):.3f})" + ) + else: + print( + "No documents found in database. This is expected if no documents were ingested." + ) + except Exception as e: + print(f"❌ Query failed: {e}") + + # Example: Run financial analysis + print("\nπŸ’Ή Running financial analysis...") + analysis_task = "What are the best top 3 ETFs for gold coverage and what are their key characteristics?" + try: + response = agent.run_analysis(analysis_task) + print("\nπŸ“ˆ Analysis Results:") + print(response) + except Exception as e: + print(f"❌ Analysis failed: {e}") + print("This might be due to API key or model access issues.") + print("Continuing with database statistics...") + + # Try a simpler query that doesn't require the LLM + print("\nπŸ” Trying simple document query instead...") + try: + simple_results = agent.query_documents( + "what do you see in the document?", limit=2 + ) + if simple_results: + print("Simple query results:") + for i, result in enumerate(simple_results, 1): + print( + f" {i}. {result.get('document_name', 'Unknown')}" + ) + print( + f" Content preview: {result.get('chunk_text', '')[:100]}..." + ) + else: + print("No results from simple query") + except Exception as simple_error: + print(f"Simple query also failed: {simple_error}") + + # Get database statistics + print("\nπŸ“Š Database Statistics:") + try: + stats = agent.get_database_stats() + for key, value in stats.items(): + print(f" {key}: {value}") + except Exception as e: + print(f"❌ Failed to get database statistics: {e}") + + print("\nβœ… Example completed successfully!") + print("πŸ’‘ To test with your own documents:") + print(" 1. Create a 'documents' directory") + print(" 2. Add PDF, TXT, or MD files") + print(" 3. Run the script again") + + +if __name__ == "__main__": + main() diff --git a/examples/single_agent/rag/simple_example.py b/examples/single_agent/rag/simple_example.py new file mode 100644 index 00000000..4d916beb --- /dev/null +++ b/examples/single_agent/rag/simple_example.py @@ -0,0 +1,184 @@ +""" +Simple Example: Qdrant RAG with Document Ingestion + +This is a simplified example showing the basic usage of the Qdrant RAG system +for document ingestion and querying. +""" + +from pathlib import Path +from examples.single_agent.rag.qdrant_rag_example import ( + QuantitativeTradingRAGAgent, +) + + +def create_sample_documents(): + """ + Create sample documents for demonstration purposes. + """ + # Create a sample documents directory + docs_dir = Path("./sample_documents") + docs_dir.mkdir(exist_ok=True) + + # Create sample text files + sample_texts = { + "gold_etf_guide.txt": """ + Gold ETFs: A Comprehensive Guide + + Gold ETFs (Exchange-Traded Funds) provide investors with exposure to gold prices + without the need to physically store the precious metal. These funds track the + price of gold and offer several advantages including liquidity, diversification, + and ease of trading. + + Top Gold ETFs include: + 1. SPDR Gold Shares (GLD) - Largest gold ETF with high liquidity + 2. iShares Gold Trust (IAU) - Lower expense ratio alternative + 3. Aberdeen Standard Physical Gold ETF (SGOL) - Swiss storage option + + Investment strategies for gold ETFs: + - Portfolio diversification (5-10% allocation) + - Inflation hedge + - Safe haven during market volatility + - Tactical trading opportunities + """, + "market_analysis.txt": """ + Market Analysis: Gold Investment Trends + + Gold has historically served as a store of value and hedge against inflation. + Recent market conditions have increased interest in gold investments due to: + + - Economic uncertainty and geopolitical tensions + - Inflation concerns and currency devaluation + - Central bank policies and interest rate environment + - Portfolio diversification needs + + Key factors affecting gold prices: + - US Dollar strength/weakness + - Real interest rates + - Central bank gold purchases + - Market risk sentiment + - Supply and demand dynamics + + Investment recommendations: + - Consider gold as 5-15% of total portfolio + - Use dollar-cost averaging for entry + - Monitor macroeconomic indicators + - Rebalance periodically + """, + "portfolio_strategies.txt": """ + Portfolio Strategies: Incorporating Gold + + Strategic allocation to gold can enhance portfolio performance through: + + 1. Risk Reduction: + - Negative correlation with equities during crises + - Volatility dampening effects + - Drawdown protection + + 2. Return Enhancement: + - Long-term appreciation potential + - Inflation-adjusted returns + - Currency diversification benefits + + 3. Implementation Methods: + - Physical gold (coins, bars) + - Gold ETFs and mutual funds + - Gold mining stocks + - Gold futures and options + + Optimal allocation ranges: + - Conservative: 5-10% + - Moderate: 10-15% + - Aggressive: 15-20% + + Rebalancing frequency: Quarterly to annually + """, + } + + # Write sample files + for filename, content in sample_texts.items(): + file_path = docs_dir / filename + with open(file_path, "w", encoding="utf-8") as f: + f.write(content.strip()) + + print( + f"Created {len(sample_texts)} sample documents in {docs_dir}" + ) + return docs_dir + + +def main(): + """ + Main function demonstrating basic Qdrant RAG usage. + """ + print("πŸš€ Qdrant RAG Simple Example") + print("=" * 50) + + # Create sample documents + docs_dir = create_sample_documents() + + # Initialize the RAG agent + print("\nπŸ“Š Initializing Quantitative Trading RAG Agent...") + agent = QuantitativeTradingRAGAgent( + agent_name="Simple-Financial-Agent", + collection_name="sample_financial_docs", + model_name="claude-sonnet-4-20250514", + chunk_size=800, # Smaller chunks for sample documents + chunk_overlap=100, + ) + + # Ingest the sample documents + print(f"\nπŸ“š Ingesting documents from {docs_dir}...") + num_ingested = agent.ingest_documents(docs_dir) + print(f"βœ… Successfully ingested {num_ingested} documents") + + # Query the document database + print("\nπŸ” Querying document database...") + queries = [ + "What are the top gold ETFs?", + "How should I allocate gold in my portfolio?", + "What factors affect gold prices?", + ] + + for query in queries: + print(f"\nQuery: {query}") + results = agent.query_documents(query, limit=2) + print(f"Found {len(results)} relevant chunks:") + + for i, result in enumerate(results, 1): + print( + f" {i}. {result['document_name']} (Score: {result['similarity_score']:.3f})" + ) + print(f" Content: {result['chunk_text'][:150]}...") + + # Run a comprehensive analysis + print("\nπŸ’Ή Running comprehensive analysis...") + analysis_task = "Based on the available documents, provide a summary of gold ETF investment strategies and portfolio allocation recommendations." + + try: + response = agent.run_analysis(analysis_task) + print("\nπŸ“ˆ Analysis Results:") + print("-" * 30) + print(response) + except Exception as e: + print(f"❌ Analysis failed: {e}") + print("This might be due to API key or model access issues.") + + # Show database statistics + print("\nπŸ“Š Database Statistics:") + stats = agent.get_database_stats() + for key, value in stats.items(): + print(f" {key}: {value}") + + # Cleanup + print("\n🧹 Cleaning up sample documents...") + import shutil + + if docs_dir.exists(): + shutil.rmtree(docs_dir) + print("Sample documents removed.") + + print("\nβœ… Example completed successfully!") + + +if __name__ == "__main__": + main() diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index bad1fc95..a8f7f113 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -48,17 +48,11 @@ from swarms.schemas.agent_mcp_errors import ( from swarms.schemas.agent_step_schemas import ManySteps, Step from swarms.schemas.base_schemas import ( AgentChatCompletionResponse, - ChatCompletionResponseChoice, - ChatMessageResponse, ) from swarms.schemas.conversation_schema import ConversationSchema from swarms.schemas.mcp_schemas import ( MCPConnection, ) -from swarms.structs.agent_rag_handler import ( - AgentRAGHandler, - RAGConfig, -) from swarms.structs.agent_roles import agent_roles from swarms.structs.conversation import Conversation from swarms.structs.ma_utils import set_random_models_for_agents @@ -78,6 +72,7 @@ from swarms.tools.py_func_to_openai_func_str import ( convert_multiple_functions_to_openai_function_schema, ) from swarms.utils.data_to_text import data_to_text +from swarms.utils.dynamic_context_window import dynamic_auto_chunking from swarms.utils.file_processing import create_file_in_folder from swarms.utils.formatter import formatter from swarms.utils.generate_keys import generate_api_key @@ -116,8 +111,6 @@ ToolUsageType = Union[BaseModel, Dict[str, Any]] # Agent Exceptions - - class AgentError(Exception): """Base class for all agent-related exceptions.""" @@ -430,7 +423,6 @@ class Agent: conversation_schema: Optional[ConversationSchema] = None, llm_base_url: Optional[str] = None, llm_api_key: Optional[str] = None, - rag_config: Optional[RAGConfig] = None, tool_call_summary: bool = True, output_raw_json_from_tool_call: bool = False, summarize_multiple_images: bool = False, @@ -570,7 +562,6 @@ class Agent: self.conversation_schema = conversation_schema self.llm_base_url = llm_base_url self.llm_api_key = llm_api_key - self.rag_config = rag_config self.tool_call_summary = tool_call_summary self.output_raw_json_from_tool_call = ( output_raw_json_from_tool_call @@ -625,22 +616,11 @@ class Agent: if self.random_models_on is True: self.model_name = set_random_models_for_agents() - if self.long_term_memory is not None: - self.rag_handler = self.rag_setup_handling() - if self.dashboard is True: self.print_dashboard() self.reliability_check() - def rag_setup_handling(self): - return AgentRAGHandler( - long_term_memory=self.long_term_memory, - config=self.rag_config, - agent_name=self.agent_name, - verbose=self.verbose, - ) - def setup_tools(self): return BaseTool( tools=self.tools, @@ -1022,6 +1002,42 @@ class Agent: title=f"Agent {self.agent_name} Dashboard", ) + def handle_rag_query(self, query: str): + """ + Handle RAG query + """ + try: + logger.info( + f"Agent: {self.agent_name} Querying RAG memory for: {query}" + ) + output = self.long_term_memory.query( + query, + ) + + output = dynamic_auto_chunking( + content=output, + context_length=self.max_tokens, + tokenizer_model_name=self.model_name, + ) + + self.short_memory.add( + role="system", + content=( + f"πŸ” [RAG Query Initiated]\n" + f"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n" + f"πŸ“ Query:\n{query}\n\n" + f"πŸ“š Retrieved Knowledge (RAG Output):\n{output}\n" + f"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n" + f"πŸ’‘ The above information was retrieved from the agent's long-term memory using Retrieval-Augmented Generation (RAG). " + f"Use this context to inform your next response or reasoning step." + ), + ) + except Exception as e: + logger.error( + f"Agent: {self.agent_name} Error handling RAG query: {e} Traceback: {traceback.format_exc()}" + ) + raise e + # Main function def _run( self, @@ -1032,21 +1048,30 @@ class Agent: **kwargs, ) -> Any: """ - run the agent + Execute the agent's main loop for a given task. + + This function manages the agent's reasoning and action loop, including: + - Initializing the task and context + - Handling Retrieval-Augmented Generation (RAG) queries if enabled + - Planning (if enabled) + - Managing internal reasoning loops and memory + - Optionally processing images and streaming output + - Autosaving agent state if configured Args: - task (str): The task to be performed. - img (str): The image to be processed. - is_last (bool): Indicates if this is the last task. + task (Optional[Union[str, Any]]): The task or prompt for the agent to process. + img (Optional[str]): Optional image path or data to be processed by the agent. + streaming_callback (Optional[Callable[[str], None]]): Optional callback for streaming output. + *args: Additional positional arguments for extensibility. + **kwargs: Additional keyword arguments for extensibility. Returns: - Any: The output of the agent. - (string, list, json, dict, yaml, xml) + Any: The agent's output, which may be a string, list, JSON, dict, YAML, XML, or other type depending on the agent's configuration and the task. Examples: agent(task="What is the capital of France?") - agent(task="What is the capital of France?", img="path/to/image.jpg") - agent(task="What is the capital of France?", img="path/to/image.jpg", is_last=True) + agent(task="Summarize this document.", img="path/to/image.jpg") + agent(task="Analyze this image.", img="path/to/image.jpg", is_last=True) """ try: @@ -1056,6 +1081,13 @@ class Agent: self.short_memory.add(role=self.user_name, content=task) + # Handle RAG query only once + if ( + self.long_term_memory is not None + and self.rag_every_loop is False + ): + self.handle_rag_query(task) + if self.plan_enabled is True: self.plan(task) @@ -1076,6 +1108,13 @@ class Agent: ): loop_count += 1 + # Handle RAG query every loop + if ( + self.long_term_memory is not None + and self.rag_every_loop is True + ): + self.handle_rag_query(task) + if ( isinstance(self.max_loops, int) and self.max_loops >= 2 @@ -1284,7 +1323,7 @@ class Agent: traceback_info = traceback.format_exc() logger.error( - f"An error occurred while running your agent {self.agent_name}.\n" + f"Agent: {self.agent_name} An error occurred while running your agent.\n" f"Error Type: {error_type}\n" f"Error Message: {error_message}\n" f"Traceback:\n{traceback_info}\n" @@ -1508,8 +1547,16 @@ class Agent: "Max loops is not provided or is set to 0. Please set max loops to 1 or more." ) - if self.max_tokens is None or self.max_tokens == 0: - self.max_tokens = get_max_tokens(self.model_name) + # Ensure max_tokens is set to a valid value based on the model, with a robust fallback. + if self.max_tokens is None or self.max_tokens <= 0: + suggested_tokens = get_max_tokens(self.model_name) + if suggested_tokens is not None and suggested_tokens > 0: + self.max_tokens = suggested_tokens + else: + logger.warning( + f"Could not determine max_tokens for model '{self.model_name}'. Falling back to default value of 8192." + ) + self.max_tokens = 8192 if self.context_length is None or self.context_length == 0: raise AgentInitializationError( @@ -2097,40 +2144,6 @@ class Agent: ) raise error - def memory_query(self, task: str = None, *args, **kwargs) -> None: - try: - # Query the long term memory - if self.long_term_memory is not None: - formatter.print_panel(f"Querying RAG for: {task}") - - memory_retrieval = self.long_term_memory.query( - task, *args, **kwargs - ) - - memory_retrieval = ( - f"Documents Available: {str(memory_retrieval)}" - ) - - # # Count the tokens - # memory_token_count = count_tokens( - # memory_retrieval - # ) - # if memory_token_count > self.memory_chunk_size: - # # Truncate the memory by the memory chunk size - # memory_retrieval = self.truncate_string_by_tokens( - # memory_retrieval, self.memory_chunk_size - # ) - - self.short_memory.add( - role="Database", - content=memory_retrieval, - ) - - return None - except Exception as e: - logger.error(f"An error occurred: {e}") - raise e - def sentiment_analysis_handler(self, response: str = None): """ Performs sentiment analysis on the given response and stores the result in the short-term memory. @@ -2223,107 +2236,6 @@ class Agent: return out - def log_step_metadata( - self, loop: int, task: str, response: str - ) -> Step: - """Log metadata for each step of agent execution.""" - # Generate unique step ID - step_id = f"step_{loop}_{uuid.uuid4().hex}" - - # Calculate token usage - # full_memory = self.short_memory.return_history_as_string() - # prompt_tokens = count_tokens(full_memory) - # completion_tokens = count_tokens(response) - # total_tokens = prompt_tokens + completion_tokens - total_tokens = (count_tokens(task) + count_tokens(response),) - - # # Get memory responses - # memory_responses = { - # "short_term": ( - # self.short_memory.return_history_as_string() - # if self.short_memory - # else None - # ), - # "long_term": ( - # self.long_term_memory.query(task) - # if self.long_term_memory - # else None - # ), - # } - - # # Get tool responses if tool was used - # if self.tools: - # try: - # tool_call_output = parse_and_execute_json( - # self.tools, response, parse_md=True - # ) - # if tool_call_output: - # { - # "tool_name": tool_call_output.get( - # "tool_name", "unknown" - # ), - # "tool_args": tool_call_output.get("args", {}), - # "tool_output": str( - # tool_call_output.get("output", "") - # ), - # } - # except Exception as e: - # logger.debug( - # f"No tool call detected in response: {e}" - # ) - - # Create memory usage tracking - # memory_usage = { - # "short_term": ( - # len(self.short_memory.messages) - # if self.short_memory - # else 0 - # ), - # "long_term": ( - # self.long_term_memory.count - # if self.long_term_memory - # else 0 - # ), - # "responses": memory_responses, - # } - - step_log = Step( - step_id=step_id, - time=time.time(), - tokens=total_tokens, - response=AgentChatCompletionResponse( - id=self.id, - agent_name=self.agent_name, - object="chat.completion", - choices=ChatCompletionResponseChoice( - index=loop, - input=task, - message=ChatMessageResponse( - role=self.agent_name, - content=response, - ), - ), - # usage=UsageInfo( - # prompt_tokens=prompt_tokens, - # completion_tokens=completion_tokens, - # total_tokens=total_tokens, - # ), - # tool_calls=( - # [] if tool_response is None else [tool_response] - # ), - # memory_usage=None, - ), - ) - - # Update total tokens if agent_output exists - # if hasattr(self, "agent_output"): - # self.agent_output.total_tokens += ( - # self.response.total_tokens - # ) - - # Add step to agent output tracking - self.step_pool.append(step_log) - def update_tool_usage( self, step_id: str,