diff --git a/.gitignore b/.gitignore index 313a28f5..8ed30e17 100644 --- a/.gitignore +++ b/.gitignore @@ -13,11 +13,14 @@ target/ Cargo.lock .pytest_cache static/generated +conversations/ runs Financial-Analysis-Agent_state.json +conversations/ experimental ffn_alternatives artifacts_five +experimental/ encryption errors chroma diff --git a/docs/swarms_cloud/swarms_api.md b/docs/swarms_cloud/swarms_api.md index e5386941..9da9ebce 100644 --- a/docs/swarms_cloud/swarms_api.md +++ b/docs/swarms_cloud/swarms_api.md @@ -13,11 +13,17 @@ The Swarms API provides a robust, scalable infrastructure for deploying and mana Key capabilities include: - **Intelligent Swarm Management**: Create and execute swarms of specialized AI agents that collaborate to solve complex tasks + - **Automatic Agent Generation**: Dynamically create optimized agents based on task requirements + - **Multiple Swarm Architectures**: Choose from various swarm patterns to match your specific workflow needs + - **Scheduled Execution**: Set up automated, scheduled swarm executions + - **Comprehensive Logging**: Track and analyze all API interactions + - **Cost Management**: Predictable, transparent pricing with optimized resource utilization + - **Enterprise Security**: Full API key authentication and management Swarms API is designed for production use cases requiring sophisticated AI orchestration, with applications in finance, healthcare, legal, research, and other domains where complex reasoning and multi-agent collaboration are needed. @@ -48,6 +54,7 @@ API keys can be obtained and managed at [https://swarms.world/platform/api-keys] | `/v1/swarms/available` | GET | Get all available swarms as a list of strings | | `/v1/models/available` | GET | Get all available models as a list of strings | | `/v1/agent/completions` | POST | Run a single agent with specified configuration | +| `/v1/agent/batch/completions` | POST | Run a batch of individual agent completions| @@ -481,6 +488,181 @@ curl -X GET "https://api.swarms.world/v1/swarm/logs" \ } ``` + + +## Individual Agent Endpoints + +### Run Single Agent + +### AgentCompletion Model + +The `AgentCompletion` model defines the configuration for running a single agent task. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| `agent_config` | AgentSpec | The configuration of the agent to be completed | Yes | +| `task` | string | The task to be completed by the agent | Yes | +| `history` | Dict[str, Any] | The history of the agent's previous tasks and responses | No | + + +### AgentSpec Model + +The `AgentSpec` model defines the configuration for an individual agent. + +| Field | Type | Default | Description | Required | +|-------|------|---------|-------------|----------| +| `agent_name` | string | None | The unique name assigned to the agent | Yes | +| `description` | string | None | Detailed explanation of the agent's purpose | No | +| `system_prompt` | string | None | Initial instruction provided to the agent | No | +| `model_name` | string | "gpt-4o-mini" | Name of the AI model to use | No | +| `auto_generate_prompt` | boolean | false | Whether to auto-generate prompts | No | +| `max_tokens` | integer | 8192 | Maximum tokens in response | No | +| `temperature` | float | 0.5 | Controls randomness (0-1) | No | +| `role` | string | "worker" | Role of the agent | No | +| `max_loops` | integer | 1 | Maximum iterations | No | +| `tools_list_dictionary` | List[Dict] | None | Available tools for the agent | No | +| `mcp_url` | string | None | URL for Model Control Protocol | No | + + +Execute a task using a single agent with specified configuration. + +**Endpoint**: `/v1/agent/completions` +**Method**: POST +**Rate Limit**: 100 requests per 60 seconds + +**Request Body**: +```json +{ + "agent_config": { + "agent_name": "Research Assistant", + "description": "Specialized in research and analysis", + "system_prompt": "You are an expert research assistant.", + "model_name": "gpt-4o", + "auto_generate_prompt": false, + "max_tokens": 8192, + "temperature": 0.5, + "role": "worker", + "max_loops": 1, + "tools_list_dictionary": [ + { + "name": "search", + "description": "Search the web for information", + "parameters": { + "query": "string" + } + } + ], + "mcp_url": "https://example-mcp.com" + }, + "task": "Research the latest developments in quantum computing and summarize key findings", + "history": { + "previous_research": "Earlier findings on quantum computing basics...", + "user_preferences": "Focus on practical applications..." + } +} +``` + +**Response**: +```json +{ + "id": "agent-abc123xyz", + "success": true, + "name": "Research Assistant", + "description": "Specialized in research and analysis", + "temperature": 0.5, + "outputs": { + "research_summary": "...", + "key_findings": [ + "..." + ] + }, + "usage": { + "input_tokens": 450, + "output_tokens": 850, + "total_tokens": 1300, + "mcp_url": 0.1 + }, + "timestamp": "2024-03-05T12:34:56.789Z" +} +``` + +#### Run Batch Agents + +Execute multiple agent tasks in parallel. + +**Endpoint**: `/v1/agent/batch/completions` +**Method**: POST +**Rate Limit**: 100 requests per 60 seconds +**Maximum Batch Size**: 10 requests +**Input** A list of `AgentCompeletion` inputs + +**Request Body**: +```json +[ + { + "agent_config": { + "agent_name": "Market Analyst", + "description": "Expert in market analysis", + "system_prompt": "You are a financial market analyst.", + "model_name": "gpt-4o", + "temperature": 0.3 + }, + "task": "Analyze the current market trends in AI technology sector" + }, + { + "agent_config": { + "agent_name": "Technical Writer", + "description": "Specialized in technical documentation", + "system_prompt": "You are a technical documentation expert.", + "model_name": "gpt-4o", + "temperature": 0.7 + }, + "task": "Create a technical guide for implementing OAuth2 authentication" + } +] +``` + +**Response**: +```json +{ + "batch_id": "agent-batch-xyz789", + "total_requests": 2, + "execution_time": 15.5, + "timestamp": "2024-03-05T12:34:56.789Z", + "results": [ + { + "id": "agent-abc123", + "success": true, + "name": "Market Analyst", + "outputs": { + "market_analysis": "..." + }, + "usage": { + "input_tokens": 300, + "output_tokens": 600, + "total_tokens": 900 + } + }, + { + "id": "agent-def456", + "success": true, + "name": "Technical Writer", + "outputs": { + "technical_guide": "..." + }, + "usage": { + "input_tokens": 400, + "output_tokens": 800, + "total_tokens": 1200 + } + } + ] +} +``` + + +----- + ## Production Examples ### Python Examples @@ -889,100 +1071,88 @@ Error responses include a detailed message explaining the issue: ## Rate Limiting -The API enforces a rate limit of 100 requests per 60-second window. When exceeded, a 429 status code is returned. Implement appropriate retry logic with exponential backoff in production applications. +| Description | Details | +|-------------|---------| +| Rate Limit | 100 requests per 60-second window | +| Exceed Consequence | 429 status code returned | +| Recommended Action | Implement retry logic with exponential backoff | ## Billing & Cost Management -The API uses a credit-based billing system with costs calculated based on: - -1. **Agent Count**: Base cost per agent - - -2. **Input Tokens**: Cost based on the size of input data and prompts - -3. **Output Tokens**: Cost based on the length of generated responses - -4. **Time of Day**: Reduced rates during nighttime hours (8 PM to 6 AM PT) - -Cost information is included in each response's metadata for transparency and forecasting. +| Cost Factor | Description | +|-------------|-------------| +| Agent Count | Base cost per agent | +| Input Tokens | Cost based on size of input data and prompts | +| Output Tokens | Cost based on length of generated responses | +| Time of Day | Reduced rates during nighttime hours (8 PM to 6 AM PT) | +| Cost Information | Included in each response's metadata | ## Best Practices -1. **Task Description** - - - Provide detailed, specific task descriptions - - - Include all necessary context and constraints - - - Structure complex inputs for easier processing - -2. **Agent Configuration** - - - For simple tasks, use `AutoSwarmBuilder` to automatically generate optimal agents - - - For complex or specialized tasks, manually define agents with specific expertise - - - Use appropriate `swarm_type` for your workflow pattern +### Task Description -3. **Production Implementation** +| Practice | Description | +|----------|-------------| +| Detail | Provide detailed, specific task descriptions | +| Context | Include all necessary context and constraints | +| Structure | Structure complex inputs for easier processing | - - Implement robust error handling and retries - - - Log API responses for debugging and auditing - - - Monitor costs closely during development and testing - - - Use scheduled jobs for recurring tasks instead of continuous polling +### Agent Configuration -4. **Cost Optimization** +| Practice | Description | +|----------|-------------| +| Simple Tasks | Use `AutoSwarmBuilder` for automatic agent generation | +| Complex Tasks | Manually define agents with specific expertise | +| Workflow | Use appropriate `swarm_type` for your workflow pattern | - - Batch related tasks when possible +### Production Implementation - - Schedule non-urgent tasks during discount hours +| Practice | Description | +|----------|-------------| +| Error Handling | Implement robust error handling and retries | +| Logging | Log API responses for debugging and auditing | +| Cost Monitoring | Monitor costs closely during development and testing | +| Scheduling | Use scheduled jobs for recurring tasks instead of polling | - - Carefully scope task descriptions to reduce token usage - - - Cache results when appropriate +### Cost Optimization +| Practice | Description | +|----------|-------------| +| Batching | Batch related tasks when possible | +| Scheduling | Schedule non-urgent tasks during discount hours | +| Scoping | Carefully scope task descriptions to reduce token usage | +| Caching | Cache results when appropriate | ## Support -For technical assistance with the Swarms API, please contact: - -- Documentation: [https://docs.swarms.world](https://docs.swarms.world) -- Email: kye@swarms.world -- Community Discord: [https://discord.gg/jM3Z6M9uMq](https://discord.gg/jM3Z6M9uMq) -- Swarms Marketplace: [https://swarms.world](https://swarms.world) -- Swarms AI Website: [https://swarms.ai](https://swarms.ai) +| Support Type | Contact Information | +|--------------|---------------------| +| Documentation | [https://docs.swarms.world](https://docs.swarms.world) | +| Email | kye@swarms.world | +| Community | [https://discord.gg/jM3Z6M9uMq](https://discord.gg/jM3Z6M9uMq) | +| Marketplace | [https://swarms.world](https://swarms.world) | +| Website | [https://swarms.ai](https://swarms.ai) | ## Service Tiers -The API offers two service tiers to accommodate different processing needs: - ### Standard Tier -- Default processing tier - -- Immediate execution - -- Higher priority processing - -- Standard pricing - -- 5-minute timeout limit +| Feature | Description | +|---------|-------------| +| Processing | Default processing tier | +| Execution | Immediate execution | +| Priority | Higher priority processing | +| Pricing | Standard pricing | +| Timeout | 5-minute timeout limit | ### Flex Tier -- Lower cost processing - -- Automatic retries (up to 3 attempts) - -- Longer timeout (15 minutes) - -- 75% discount on token costs - -- Best for non-urgent tasks - -- Exponential backoff on resource contention - -To use the flex tier, set `service_tier: "flex"` in your SwarmSpec configuration. +| Feature | Description | +|---------|-------------| +| Cost | Lower cost processing | +| Retries | Automatic retries (up to 3 attempts) | +| Timeout | 15-minute timeout | +| Discount | 75% discount on token costs | +| Suitability | Best for non-urgent tasks | +| Backoff | Exponential backoff on resource contention | +| Configuration | Set `service_tier: "flex"` in SwarmSpec | \ No newline at end of file diff --git a/example.py b/example.py index 3915827c..a99560dc 100644 --- a/example.py +++ b/example.py @@ -1,4 +1,6 @@ +import time from swarms import Agent +from swarms.schemas.conversation_schema import ConversationSchema # Initialize the agent agent = Agent( @@ -33,11 +35,18 @@ agent = Agent( - Performance attribution You communicate in precise, technical terms while maintaining clarity for stakeholders.""", - max_loops=3, + max_loops=1, model_name="gpt-4o-mini", dynamic_temperature_enabled=True, - output_type="all", + output_type="json", safety_prompt_on=True, + conversation_schema=ConversationSchema( + time_enabled=True, + message_id_on=True, + ), ) -print(agent.run("What are the best top 3 etfs for gold coverage?")) +out = agent.run("What are the best top 3 etfs for gold coverage?") + +time.sleep(10) +print(out) diff --git a/examples/misc/test_load_conversation.py b/examples/misc/test_load_conversation.py new file mode 100644 index 00000000..bf1235b4 --- /dev/null +++ b/examples/misc/test_load_conversation.py @@ -0,0 +1,85 @@ +from swarms.structs.conversation import ( + Conversation, + get_conversation_dir, +) +import os +import shutil + + +def cleanup_test_conversations(): + """Clean up test conversation files after running the example.""" + conv_dir = get_conversation_dir() + if os.path.exists(conv_dir): + shutil.rmtree(conv_dir) + print( + f"\nCleaned up test conversations directory: {conv_dir}" + ) + + +def main(): + # Example 1: In-memory only conversation (no saving) + print("\nExample 1: In-memory conversation (no saving)") + conv_memory = Conversation( + name="memory_only_chat", + save_enabled=False, # Don't save to disk + autosave=False, + ) + conv_memory.add("user", "This conversation won't be saved!") + conv_memory.display_conversation() + + # Example 2: Conversation with autosaving + print("\nExample 2: Conversation with autosaving") + conversation_dir = get_conversation_dir() + print(f"Conversations will be stored in: {conversation_dir}") + + conv_autosave = Conversation( + name="autosave_chat", + conversations_dir=conversation_dir, + save_enabled=True, # Enable saving + autosave=True, # Enable autosaving + ) + print(f"Created new conversation with ID: {conv_autosave.id}") + print( + f"This conversation is saved at: {conv_autosave.save_filepath}" + ) + + # Add some messages (each will be autosaved) + conv_autosave.add("user", "Hello! How are you?") + conv_autosave.add( + "assistant", + "I'm doing well, thank you! How can I help you today?", + ) + + # Example 3: Load from specific file + print("\nExample 3: Load from specific file") + custom_file = os.path.join(conversation_dir, "custom_chat.json") + + # Create a conversation and save it to a custom file + conv_custom = Conversation( + name="custom_chat", + save_filepath=custom_file, + save_enabled=True, + ) + conv_custom.add("user", "This is a custom saved conversation") + conv_custom.add( + "assistant", "I'll be saved in a custom location!" + ) + conv_custom.save_as_json() + + # Now load it specifically + loaded_conv = Conversation.load_conversation( + name="custom_chat", load_filepath=custom_file + ) + print("Loaded custom conversation:") + loaded_conv.display_conversation() + + # List all saved conversations + print("\nAll saved conversations:") + conversations = Conversation.list_conversations(conversation_dir) + for conv_info in conversations: + print( + f"- {conv_info['name']} (ID: {conv_info['id']}, Created: {conv_info['created_at']})" + ) + + +main() diff --git a/examples/single_agent/utils/list_agent_output_types.py b/examples/single_agent/utils/list_agent_output_types.py new file mode 100644 index 00000000..af9e56d3 --- /dev/null +++ b/examples/single_agent/utils/list_agent_output_types.py @@ -0,0 +1,9 @@ +from swarms.structs.agent import Agent + +agent = Agent( + agent_name="test", + agent_description="test", + system_prompt="test", +) + +print(agent.list_output_types()) diff --git a/pyproject.toml b/pyproject.toml index 58e7e0ff..be9afd13 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "7.8.3" +version = "7.8.5" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/agents/ape_agent.py b/swarms/agents/ape_agent.py index 420b7aaa..72ea7867 100644 --- a/swarms/agents/ape_agent.py +++ b/swarms/agents/ape_agent.py @@ -1,27 +1,17 @@ -from typing import Any +from typing import Callable -from tenacity import retry, stop_after_attempt, wait_exponential -from swarms.prompts.prompt_generator import ( - prompt_generator_sys_prompt as second_sys_prompt, -) from swarms.prompts.prompt_generator_optimizer import ( - prompt_generator_sys_prompt, + OPENAI_PROMPT_GENERATOR_SYS_PROMPT, ) from swarms.utils.loguru_logger import initialize_logger logger = initialize_logger(log_folder="ape_agent") -@retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), -) def auto_generate_prompt( task: str = None, - model: Any = None, - max_tokens: int = 4000, - use_second_sys_prompt: bool = True, + model: Callable = None, *args, **kwargs, ): @@ -38,16 +28,9 @@ def auto_generate_prompt( str: The generated prompt. """ try: - system_prompt = ( - second_sys_prompt.get_prompt() - if use_second_sys_prompt - else prompt_generator_sys_prompt.get_prompt() - ) - output = model.run( - system_prompt + task, max_tokens=max_tokens + return model.run( + task=f"{OPENAI_PROMPT_GENERATOR_SYS_PROMPT} \n\n Task: {task}" ) - print(output) - return output except Exception as e: logger.error(f"Error generating prompt: {str(e)}") raise diff --git a/swarms/agents/consistency_agent.py b/swarms/agents/consistency_agent.py index 5ee3aa2f..b06db583 100644 --- a/swarms/agents/consistency_agent.py +++ b/swarms/agents/consistency_agent.py @@ -7,7 +7,7 @@ from loguru import logger from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation from swarms.structs.malt import majority_voting_prompt -from swarms.structs.output_types import OutputType +from swarms.utils.output_types import OutputType from swarms.utils.any_to_str import any_to_str from swarms.utils.history_output_formatter import ( history_output_formatter, diff --git a/swarms/agents/i_agent.py b/swarms/agents/i_agent.py index fdd17c64..674c3f4a 100644 --- a/swarms/agents/i_agent.py +++ b/swarms/agents/i_agent.py @@ -22,7 +22,7 @@ from typing import List, Tuple from loguru import logger from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation -from swarms.structs.output_types import OutputType +from swarms.utils.output_types import OutputType from swarms.utils.history_output_formatter import ( history_output_formatter, ) diff --git a/swarms/agents/reasoning_agents.py b/swarms/agents/reasoning_agents.py index 17a6089f..e1419ee0 100644 --- a/swarms/agents/reasoning_agents.py +++ b/swarms/agents/reasoning_agents.py @@ -7,7 +7,7 @@ from swarms.agents.i_agent import ( IterativeReflectiveExpansion as IREAgent, ) from swarms.agents.reasoning_duo import ReasoningDuo -from swarms.structs.output_types import OutputType +from swarms.utils.output_types import OutputType from swarms.agents.agent_judge import AgentJudge agent_types = Literal[ diff --git a/swarms/agents/reasoning_duo.py b/swarms/agents/reasoning_duo.py index 7db84fdc..57ae9849 100644 --- a/swarms/agents/reasoning_duo.py +++ b/swarms/agents/reasoning_duo.py @@ -4,7 +4,7 @@ from loguru import logger from swarms.prompts.reasoning_prompt import REASONING_PROMPT from swarms.structs.agent import Agent -from swarms.structs.output_types import OutputType +from swarms.utils.output_types import OutputType from swarms.structs.conversation import Conversation from swarms.utils.history_output_formatter import ( history_output_formatter, diff --git a/swarms/prompts/agent_conversation_aggregator.py b/swarms/prompts/agent_conversation_aggregator.py new file mode 100644 index 00000000..03d54cb5 --- /dev/null +++ b/swarms/prompts/agent_conversation_aggregator.py @@ -0,0 +1,38 @@ +AGGREGATOR_SYSTEM_PROMPT = """You are a highly skilled Aggregator Agent responsible for analyzing, synthesizing, and summarizing conversations between multiple AI agents. Your primary goal is to distill complex multi-agent interactions into clear, actionable insights. + +Key Responsibilities: +1. Conversation Analysis: + - Identify the main topics and themes discussed + - Track the progression of ideas and problem-solving approaches + - Recognize key decisions and turning points in the conversation + - Note any conflicts, agreements, or important conclusions reached + +2. Agent Contribution Assessment: + - Evaluate each agent's unique contributions to the discussion + - Highlight complementary perspectives and insights + - Identify any knowledge gaps or areas requiring further exploration + - Recognize patterns in agent interactions and collaborative dynamics + +3. Summary Generation Guidelines: + - Begin with a high-level overview of the conversation's purpose and outcome + - Structure the summary in a logical, hierarchical manner + - Prioritize critical information while maintaining context + - Include specific examples or quotes when they significantly impact understanding + - Maintain objectivity while synthesizing different viewpoints + - Highlight actionable insights and next steps if applicable + +4. Quality Standards: + - Ensure accuracy in representing each agent's contributions + - Maintain clarity and conciseness without oversimplifying + - Use consistent terminology throughout the summary + - Preserve important technical details and domain-specific language + - Flag any uncertainties or areas needing clarification + +5. Output Format: + - Present information in a structured, easy-to-read format + - Use bullet points or sections for better readability when appropriate + - Include a brief conclusion or recommendation section if relevant + - Maintain professional and neutral tone throughout + +Remember: Your role is crucial in making complex multi-agent discussions accessible and actionable. Focus on extracting value from the conversation while maintaining the integrity of each agent's contributions. +""" diff --git a/swarms/prompts/prompt.py b/swarms/prompts/prompt.py index 9c4aeb5a..54a28fcf 100644 --- a/swarms/prompts/prompt.py +++ b/swarms/prompts/prompt.py @@ -99,9 +99,6 @@ class Prompt(BaseModel): if self.autosave: self._autosave() - if self.auto_generate_prompt and self.llm: - self.auto_generate_prompt() - def edit_prompt(self, new_content: str) -> None: """ Edits the prompt content and updates the version control. diff --git a/swarms/schemas/agent_completion_response.py b/swarms/schemas/agent_completion_response.py new file mode 100644 index 00000000..fb03fbae --- /dev/null +++ b/swarms/schemas/agent_completion_response.py @@ -0,0 +1,71 @@ +from datetime import datetime +from typing import Any, List, Optional + +from pydantic import BaseModel, Field + + +class Usage(BaseModel): + prompt_tokens: Optional[int] = Field( + default=None, + description="Number of tokens used in the prompt", + ) + completion_tokens: Optional[int] = Field( + default=None, + description="Number of tokens used in the completion", + ) + total_tokens: Optional[int] = Field( + default=None, description="Total number of tokens used" + ) + + +class ModelConfig(BaseModel): + model_name: Optional[str] = Field( + default=None, + description="Name of the model used for generation", + ) + temperature: Optional[float] = Field( + default=None, + description="Temperature setting used for generation", + ) + top_p: Optional[float] = Field( + default=None, description="Top-p setting used for generation" + ) + max_tokens: Optional[int] = Field( + default=None, + description="Maximum number of tokens to generate", + ) + frequency_penalty: Optional[float] = Field( + default=None, + description="Frequency penalty used for generation", + ) + presence_penalty: Optional[float] = Field( + default=None, + description="Presence penalty used for generation", + ) + + +class AgentCompletionResponse(BaseModel): + id: Optional[str] = Field( + default=None, description="Unique identifier for the response" + ) + agent_name: Optional[str] = Field( + default=None, + description="Name of the agent that generated the response", + ) + agent_description: Optional[str] = Field( + default=None, description="Description of the agent" + ) + outputs: Optional[List[Any]] = Field( + default=None, + description="List of outputs generated by the agent", + ) + usage: Optional[Usage] = Field( + default=None, description="Token usage statistics" + ) + model_config: Optional[ModelConfig] = Field( + default=None, description="Model configuration" + ) + timestamp: Optional[str] = Field( + default_factory=lambda: datetime.now().isoformat(), + description="Timestamp of when the response was generated", + ) diff --git a/swarms/schemas/conversation_schema.py b/swarms/schemas/conversation_schema.py new file mode 100644 index 00000000..0057149a --- /dev/null +++ b/swarms/schemas/conversation_schema.py @@ -0,0 +1,9 @@ +from typing import Optional +from pydantic import BaseModel, Field + + +class ConversationSchema(BaseModel): + time_enabled: Optional[bool] = Field(default=False) + message_id_on: Optional[bool] = Field(default=True) + autosave: Optional[bool] = Field(default=False) + count_tokens: Optional[bool] = Field(default=False) diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 66ebac72..377fb2b4 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -80,6 +80,8 @@ from swarms.structs.swarming_architectures import ( from swarms.structs.auto_swarm_builder import AutoSwarmBuilder from swarms.structs.council_judge import CouncilAsAJudge from swarms.structs.batch_agent_execution import batch_agent_execution +from swarms.structs.ma_blocks import aggregate + __all__ = [ "Agent", @@ -150,4 +152,5 @@ __all__ = [ "AutoSwarmBuilder", "CouncilAsAJudge", "batch_agent_execution", + "aggregate", ] diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 724303ce..4064620b 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -42,7 +42,6 @@ from swarms.schemas.base_schemas import ( ) from swarms.structs.agent_roles import agent_roles from swarms.structs.conversation import Conversation -from swarms.structs.output_types import OutputType from swarms.structs.safe_loading import ( SafeLoaderUtils, SafeStateManager, @@ -78,10 +77,10 @@ from swarms.utils.index import ( format_data_structure, format_dict_to_string, ) +from swarms.schemas.conversation_schema import ConversationSchema +from swarms.utils.output_types import OutputType -# Utils -# Custom stopping condition def stop_when_repeats(response: str) -> bool: # Stop if the word stop appears in the response return "stop" in response.lower() @@ -406,7 +405,8 @@ class Agent: safety_prompt_on: bool = False, random_models_on: bool = False, mcp_config: Optional[MCPConnection] = None, - top_p: float = 0.90, + top_p: Optional[float] = 0.90, + conversation_schema: Optional[ConversationSchema] = None, *args, **kwargs, ): @@ -533,6 +533,7 @@ class Agent: self.random_models_on = random_models_on self.mcp_config = mcp_config self.top_p = top_p + self.conversation_schema = conversation_schema self._cached_llm = ( None # Add this line to cache the LLM instance @@ -612,10 +613,28 @@ class Agent: # Initialize the short term memory memory = Conversation( system_prompt=prompt, - time_enabled=False, user=self.user_name, rules=self.rules, - token_count=False, + token_count=( + self.conversation_schema.count_tokens + if self.conversation_schema + else False + ), + message_id_on=( + self.conversation_schema.message_id_on + if self.conversation_schema + else False + ), + autosave=( + self.conversation_schema.autosave + if self.conversation_schema + else False + ), + time_enabled=( + self.conversation_schema.time_enabled + if self.conversation_schema + else False + ), ) return memory @@ -770,7 +789,7 @@ class Agent: "No agent details found. Using task as fallback for prompt generation." ) self.system_prompt = auto_generate_prompt( - task, self.llm + task=task, model=self._cached_llm ) else: # Combine all available components @@ -2824,3 +2843,6 @@ class Agent: f"{tool_response}", loop_count, ) + + def list_output_types(self): + return OutputType diff --git a/swarms/structs/conversation.py b/swarms/structs/conversation.py index 6889fb03..a87b1579 100644 --- a/swarms/structs/conversation.py +++ b/swarms/structs/conversation.py @@ -1,18 +1,18 @@ import concurrent.futures import datetime -import hashlib import json import os import threading import uuid from typing import ( TYPE_CHECKING, - Any, + Callable, Dict, List, Optional, Union, Literal, + Any, ) import yaml @@ -33,6 +33,25 @@ def generate_conversation_id(): return str(uuid.uuid4()) +def get_conversation_dir(): + """Get the directory for storing conversation logs.""" + # Get the current working directory + conversation_dir = os.path.join(os.getcwd(), "conversations") + try: + os.makedirs(conversation_dir, mode=0o755, exist_ok=True) + except Exception as e: + logger.error( + f"Failed to create conversations directory: {str(e)}" + ) + # Fallback to the same directory as the script + conversation_dir = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "conversations", + ) + os.makedirs(conversation_dir, mode=0o755, exist_ok=True) + return conversation_dir + + # Define available providers providers = Literal["mem0", "in-memory"] @@ -58,10 +77,6 @@ class Conversation(BaseStructure): save_as_json_bool (bool): Flag to save conversation history as JSON. token_count (bool): Flag to enable token counting for messages. conversation_history (list): List to store the history of messages. - cache_enabled (bool): Flag to enable prompt caching. - cache_stats (dict): Statistics about cache usage. - cache_lock (threading.Lock): Lock for thread-safe cache operations. - conversations_dir (str): Directory to store cached conversations. """ def __init__( @@ -70,20 +85,21 @@ class Conversation(BaseStructure): name: str = None, system_prompt: Optional[str] = None, time_enabled: bool = False, - autosave: bool = False, + autosave: bool = False, # Changed default to False + save_enabled: bool = False, # New parameter to control if saving is enabled save_filepath: str = None, - tokenizer: Any = None, + load_filepath: str = None, # New parameter to specify which file to load from + tokenizer: Callable = None, context_length: int = 8192, rules: str = None, custom_rules_prompt: str = None, user: str = "User:", - auto_save: bool = True, - save_as_yaml: bool = True, + save_as_yaml: bool = False, save_as_json_bool: bool = False, token_count: bool = True, - cache_enabled: bool = True, - conversations_dir: Optional[str] = None, provider: providers = "in-memory", + conversations_dir: Optional[str] = None, + message_id_on: bool = False, *args, **kwargs, ): @@ -95,73 +111,88 @@ class Conversation(BaseStructure): self.system_prompt = system_prompt self.time_enabled = time_enabled self.autosave = autosave - self.save_filepath = save_filepath + self.save_enabled = save_enabled + self.conversations_dir = conversations_dir + self.message_id_on = message_id_on + + # Handle save filepath + if save_enabled and save_filepath: + self.save_filepath = save_filepath + elif save_enabled and conversations_dir: + self.save_filepath = os.path.join( + conversations_dir, f"{self.id}.json" + ) + else: + self.save_filepath = None + + self.load_filepath = load_filepath self.conversation_history = [] self.tokenizer = tokenizer self.context_length = context_length self.rules = rules self.custom_rules_prompt = custom_rules_prompt self.user = user - self.auto_save = auto_save self.save_as_yaml = save_as_yaml self.save_as_json_bool = save_as_json_bool self.token_count = token_count - self.cache_enabled = cache_enabled self.provider = provider - self.cache_stats = { - "hits": 0, - "misses": 0, - "cached_tokens": 0, - "total_tokens": 0, - } - self.cache_lock = threading.Lock() - self.conversations_dir = conversations_dir + # Create conversation directory if saving is enabled + if self.save_enabled and self.conversations_dir: + os.makedirs(self.conversations_dir, exist_ok=True) + + # Try to load existing conversation or initialize new one self.setup() def setup(self): - # Set up conversations directory - self.conversations_dir = ( - self.conversations_dir - or os.path.join( - os.path.expanduser("~"), ".swarms", "conversations" - ) - ) - os.makedirs(self.conversations_dir, exist_ok=True) - - # Try to load existing conversation if it exists - conversation_file = os.path.join( - self.conversations_dir, f"{self.name}.json" - ) - if os.path.exists(conversation_file): - with open(conversation_file, "r") as f: - saved_data = json.load(f) - # Update attributes from saved data - for key, value in saved_data.get( - "metadata", {} - ).items(): - if hasattr(self, key): - setattr(self, key, value) - self.conversation_history = saved_data.get( - "history", [] + """Set up the conversation by either loading existing data or initializing new.""" + if self.load_filepath and os.path.exists(self.load_filepath): + try: + self.load_from_json(self.load_filepath) + logger.info( + f"Loaded existing conversation from {self.load_filepath}" + ) + except Exception as e: + logger.error(f"Failed to load conversation: {str(e)}") + self._initialize_new_conversation() + elif self.save_filepath and os.path.exists( + self.save_filepath + ): + try: + self.load_from_json(self.save_filepath) + logger.info( + f"Loaded existing conversation from {self.save_filepath}" ) + except Exception as e: + logger.error(f"Failed to load conversation: {str(e)}") + self._initialize_new_conversation() else: - # If system prompt is not None, add it to the conversation history - if self.system_prompt is not None: - self.add("System", self.system_prompt) - - if self.rules is not None: - self.add(self.user or "User", self.rules) - - if self.custom_rules_prompt is not None: - self.add( - self.user or "User", self.custom_rules_prompt + self._initialize_new_conversation() + + def _initialize_new_conversation(self): + """Initialize a new conversation with system prompt and rules.""" + if self.system_prompt is not None: + self.add("System", self.system_prompt) + + if self.rules is not None: + self.add(self.user or "User", self.rules) + + if self.custom_rules_prompt is not None: + self.add(self.user or "User", self.custom_rules_prompt) + + if self.tokenizer is not None: + self.truncate_memory_with_tokenizer() + + def _autosave(self): + """Automatically save the conversation if autosave is enabled.""" + if self.autosave and self.save_filepath: + try: + self.save_as_json(self.save_filepath) + except Exception as e: + logger.error( + f"Failed to autosave conversation: {str(e)}" ) - # If tokenizer then truncate - if self.tokenizer is not None: - self.truncate_memory_with_tokenizer() - def mem0_provider(self): try: from mem0 import AsyncMemory @@ -180,104 +211,10 @@ class Conversation(BaseStructure): ) return None - def _generate_cache_key( - self, content: Union[str, dict, list] - ) -> str: - """Generate a cache key for the given content. - - Args: - content (Union[str, dict, list]): The content to generate a cache key for. - - Returns: - str: The cache key. - """ - if isinstance(content, (dict, list)): - content = json.dumps(content, sort_keys=True) - return hashlib.md5(content.encode()).hexdigest() - - def _get_cached_tokens( - self, content: Union[str, dict, list] - ) -> Optional[int]: - """Get the number of cached tokens for the given content. - - Args: - content (Union[str, dict, list]): The content to check. - - Returns: - Optional[int]: The number of cached tokens, or None if not cached. - """ - if not self.cache_enabled: - return None - - with self.cache_lock: - cache_key = self._generate_cache_key(content) - if cache_key in self.cache_stats: - self.cache_stats["hits"] += 1 - return self.cache_stats[cache_key] - self.cache_stats["misses"] += 1 - return None - - def _update_cache_stats( - self, content: Union[str, dict, list], token_count: int - ): - """Update cache statistics for the given content. - - Args: - content (Union[str, dict, list]): The content to update stats for. - token_count (int): The number of tokens in the content. - """ - if not self.cache_enabled: - return - - with self.cache_lock: - cache_key = self._generate_cache_key(content) - self.cache_stats[cache_key] = token_count - self.cache_stats["cached_tokens"] += token_count - self.cache_stats["total_tokens"] += token_count - - def _save_to_cache(self): - """Save the current conversation state to the cache directory.""" - if not self.conversations_dir: - return - - conversation_file = os.path.join( - self.conversations_dir, f"{self.name}.json" - ) - - # Prepare metadata - metadata = { - "id": self.id, - "name": self.name, - "system_prompt": self.system_prompt, - "time_enabled": self.time_enabled, - "autosave": self.autosave, - "save_filepath": self.save_filepath, - "context_length": self.context_length, - "rules": self.rules, - "custom_rules_prompt": self.custom_rules_prompt, - "user": self.user, - "auto_save": self.auto_save, - "save_as_yaml": self.save_as_yaml, - "save_as_json_bool": self.save_as_json_bool, - "token_count": self.token_count, - "cache_enabled": self.cache_enabled, - } - - # Prepare data to save - save_data = { - "metadata": metadata, - "history": self.conversation_history, - "cache_stats": self.cache_stats, - } - - # Save to file - with open(conversation_file, "w") as f: - json.dump(save_data, f, indent=4) - def add_in_memory( self, role: str, - content: Union[str, dict, list], + content: Union[str, dict, list, Any], *args, **kwargs, ): @@ -287,39 +224,32 @@ class Conversation(BaseStructure): role (str): The role of the speaker (e.g., 'User', 'System'). content (Union[str, dict, list]): The content of the message to be added. """ - # Base message with role + # Base message with role and timestamp message = { "role": role, + "content": content, } - # Handle different content types - if isinstance(content, dict) or isinstance(content, list): - message["content"] = content - elif self.time_enabled: - message["content"] = ( - f"Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} \n {content}" - ) - else: - message["content"] = content + if self.time_enabled: + message["timestamp"] = datetime.datetime.now().isoformat() - # Check cache for token count - cached_tokens = self._get_cached_tokens(content) - if cached_tokens is not None: - message["token_count"] = cached_tokens - message["cached"] = True - else: - message["cached"] = False + if self.message_id_on: + message["message_id"] = str(uuid.uuid4()) - # Add message to appropriate backend + # Add message to conversation history self.conversation_history.append(message) - if self.token_count is True and not message.get( - "cached", False - ): + if self.token_count is True: self._count_tokens(content, message) - # Save to cache after adding message - self._save_to_cache() + # Autosave after adding message, but only if saving is enabled + if self.autosave and self.save_enabled and self.save_filepath: + try: + self.save_as_json(self.save_filepath) + except Exception as e: + logger.error( + f"Failed to autosave conversation: {str(e)}" + ) def add_mem0( self, @@ -367,8 +297,6 @@ class Conversation(BaseStructure): tokens = count_tokens(any_to_str(content)) # Update the message that's already in the conversation history message["token_count"] = int(tokens) - # Update cache stats - self._update_cache_stats(content, int(tokens)) # If autosave is enabled, save after token count is updated if self.autosave: @@ -413,7 +341,6 @@ class Conversation(BaseStructure): index (str): Index of the message to delete. """ self.conversation_history.pop(index) - self._save_to_cache() def update(self, index: str, role, content): """Update a message in the conversation history. @@ -427,7 +354,6 @@ class Conversation(BaseStructure): "role": role, "content": content, } - self._save_to_cache() def query(self, index: str): """Query a message in the conversation history. @@ -462,9 +388,26 @@ class Conversation(BaseStructure): detailed (bool, optional): Flag to display detailed information. Defaults to False. """ for message in self.conversation_history: - formatter.print_panel( - f"{message['role']}: {message['content']}\n\n" - ) + content = message["content"] + role = message["role"] + + # Format the message content + if isinstance(content, (dict, list)): + content = json.dumps(content, indent=2) + + # Create the display string + display_str = f"{role}: {content}" + + # Add details if requested + if detailed: + display_str += f"\nTimestamp: {message.get('timestamp', 'Unknown')}" + display_str += f"\nMessage ID: {message.get('message_id', 'Unknown')}" + if "token_count" in message: + display_str += ( + f"\nTokens: {message['token_count']}" + ) + + formatter.print_panel(display_str) def export_conversation(self, filename: str, *args, **kwargs): """Export the conversation history to a file. @@ -531,9 +474,47 @@ class Conversation(BaseStructure): Args: filename (str): Filename to save the conversation history. """ - if filename is not None: - with open(filename, "w") as f: - json.dump(self.conversation_history, f) + # Don't save if saving is disabled + if not self.save_enabled: + return + + save_path = filename or self.save_filepath + if save_path is not None: + try: + # Prepare metadata + metadata = { + "id": self.id, + "name": self.name, + "created_at": datetime.datetime.now().isoformat(), + "system_prompt": self.system_prompt, + "rules": self.rules, + "custom_rules_prompt": self.custom_rules_prompt, + } + + # Prepare save data + save_data = { + "metadata": metadata, + "history": self.conversation_history, + } + + # Create directory if it doesn't exist + os.makedirs( + os.path.dirname(save_path), + mode=0o755, + exist_ok=True, + ) + + # Write directly to file + with open(save_path, "w") as f: + json.dump(save_data, f, indent=2) + + # Only log explicit saves, not autosaves + if not self.autosave: + logger.info( + f"Successfully saved conversation to {save_path}" + ) + except Exception as e: + logger.error(f"Failed to save conversation: {str(e)}") def load_from_json(self, filename: str): """Load the conversation history from a JSON file. @@ -541,9 +522,32 @@ class Conversation(BaseStructure): Args: filename (str): Filename to load from. """ - if filename is not None: - with open(filename) as f: - self.conversation_history = json.load(f) + if filename is not None and os.path.exists(filename): + try: + with open(filename) as f: + data = json.load(f) + + # Load metadata + metadata = data.get("metadata", {}) + self.id = metadata.get("id", self.id) + self.name = metadata.get("name", self.name) + self.system_prompt = metadata.get( + "system_prompt", self.system_prompt + ) + self.rules = metadata.get("rules", self.rules) + self.custom_rules_prompt = metadata.get( + "custom_rules_prompt", self.custom_rules_prompt + ) + + # Load conversation history + self.conversation_history = data.get("history", []) + + logger.info( + f"Successfully loaded conversation from {filename}" + ) + except Exception as e: + logger.error(f"Failed to load conversation: {str(e)}") + raise def search_keyword_in_conversation(self, keyword: str): """Search for a keyword in the conversation history. @@ -600,7 +604,6 @@ class Conversation(BaseStructure): def clear(self): """Clear the conversation history.""" self.conversation_history = [] - self._save_to_cache() def to_json(self): """Convert the conversation history to a JSON string. @@ -608,7 +611,7 @@ class Conversation(BaseStructure): Returns: str: The conversation history as a JSON string. """ - return json.dumps(self.conversation_history) + return json.dumps(self.conversation_history, indent=4) def to_dict(self): """Convert the conversation history to a dictionary. @@ -759,79 +762,121 @@ class Conversation(BaseStructure): """ self.conversation_history.extend(messages) - def get_cache_stats(self) -> Dict[str, int]: - """Get statistics about cache usage. - - Returns: - Dict[str, int]: Statistics about cache usage. - """ - with self.cache_lock: - return { - "hits": self.cache_stats["hits"], - "misses": self.cache_stats["misses"], - "cached_tokens": self.cache_stats["cached_tokens"], - "total_tokens": self.cache_stats["total_tokens"], - "hit_rate": ( - self.cache_stats["hits"] - / ( - self.cache_stats["hits"] - + self.cache_stats["misses"] - ) - if ( - self.cache_stats["hits"] - + self.cache_stats["misses"] - ) - > 0 - else 0 - ), - } + def clear_memory(self): + """Clear the memory of the conversation.""" + self.conversation_history = [] @classmethod def load_conversation( - cls, name: str, conversations_dir: Optional[str] = None + cls, + name: str, + conversations_dir: Optional[str] = None, + load_filepath: Optional[str] = None, ) -> "Conversation": - """Load a conversation from the cache by name. + """Load a conversation from saved file by name or specific file. Args: name (str): Name of the conversation to load - conversations_dir (Optional[str]): Directory containing cached conversations + conversations_dir (Optional[str]): Directory containing conversations + load_filepath (Optional[str]): Specific file to load from Returns: Conversation: The loaded conversation object """ - return cls(name=name, conversations_dir=conversations_dir) + if load_filepath: + return cls( + name=name, + load_filepath=load_filepath, + save_enabled=False, # Don't enable saving when loading specific file + ) + + conv_dir = conversations_dir or get_conversation_dir() + # Try loading by name first + filepath = os.path.join(conv_dir, f"{name}.json") + + # If not found by name, try loading by ID + if not os.path.exists(filepath): + filepath = os.path.join(conv_dir, f"{name}") + if not os.path.exists(filepath): + logger.warning( + f"No conversation found with name or ID: {name}" + ) + return cls( + name=name, + conversations_dir=conv_dir, + save_enabled=True, + ) + + return cls( + name=name, + conversations_dir=conv_dir, + load_filepath=filepath, + save_enabled=True, + ) @classmethod - def list_cached_conversations( + def list_conversations( cls, conversations_dir: Optional[str] = None - ) -> List[str]: - """List all cached conversations. + ) -> List[Dict[str, str]]: + """List all saved conversations. Args: - conversations_dir (Optional[str]): Directory containing cached conversations + conversations_dir (Optional[str]): Directory containing conversations Returns: - List[str]: List of conversation names (without .json extension) + List[Dict[str, str]]: List of conversation metadata """ - if conversations_dir is None: - conversations_dir = os.path.join( - os.path.expanduser("~"), ".swarms", "conversations" - ) - - if not os.path.exists(conversations_dir): + conv_dir = conversations_dir or get_conversation_dir() + if not os.path.exists(conv_dir): return [] conversations = [] - for file in os.listdir(conversations_dir): - if file.endswith(".json"): - conversations.append( - file[:-5] - ) # Remove .json extension - return conversations + seen_ids = ( + set() + ) # Track seen conversation IDs to avoid duplicates + + for filename in os.listdir(conv_dir): + if filename.endswith(".json"): + try: + filepath = os.path.join(conv_dir, filename) + with open(filepath) as f: + data = json.load(f) + metadata = data.get("metadata", {}) + conv_id = metadata.get("id") + name = metadata.get("name") + created_at = metadata.get("created_at") + + # Skip if we've already seen this ID or if required fields are missing + if ( + not all([conv_id, name, created_at]) + or conv_id in seen_ids + ): + continue + + seen_ids.add(conv_id) + conversations.append( + { + "id": conv_id, + "name": name, + "created_at": created_at, + "filepath": filepath, + } + ) + except json.JSONDecodeError: + logger.warning( + f"Skipping corrupted conversation file: {filename}" + ) + continue + except Exception as e: + logger.error( + f"Failed to read conversation {filename}: {str(e)}" + ) + continue - def clear_memory(self): - """Clear the memory of the conversation.""" - self.conversation_history = [] + # Sort by creation date, newest first + return sorted( + conversations, key=lambda x: x["created_at"], reverse=True + ) # # Example usage diff --git a/swarms/structs/hiearchical_swarm.py b/swarms/structs/hiearchical_swarm.py index a8571928..af89e163 100644 --- a/swarms/structs/hiearchical_swarm.py +++ b/swarms/structs/hiearchical_swarm.py @@ -7,7 +7,7 @@ from pydantic import BaseModel, Field from swarms.structs.agent import Agent from swarms.structs.base_swarm import BaseSwarm from swarms.structs.conversation import Conversation -from swarms.structs.output_types import OutputType +from swarms.utils.output_types import OutputType from swarms.utils.any_to_str import any_to_str from swarms.utils.formatter import formatter diff --git a/swarms/structs/hybrid_hiearchical_peer_swarm.py b/swarms/structs/hybrid_hiearchical_peer_swarm.py index 650b3024..9876477a 100644 --- a/swarms/structs/hybrid_hiearchical_peer_swarm.py +++ b/swarms/structs/hybrid_hiearchical_peer_swarm.py @@ -1,5 +1,5 @@ import os -from typing import List, Literal +from typing import List from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation from swarms.structs.multi_agent_exec import get_swarms_info @@ -9,23 +9,7 @@ from swarms.utils.history_output_formatter import ( ) from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Union, Callable - - -HistoryOutputType = Literal[ - "list", - "dict", - "dictionary", - "string", - "str", - "final", - "last", - "json", - "all", - "yaml", - # "dict-final", - "dict-all-except-first", - "str-all-except-first", -] +from swarms.utils.history_output_formatter import HistoryOutputType tools = [ { diff --git a/swarms/structs/ma_blocks.py b/swarms/structs/ma_blocks.py new file mode 100644 index 00000000..60b4a56a --- /dev/null +++ b/swarms/structs/ma_blocks.py @@ -0,0 +1,84 @@ +from swarms.structs.agent import Agent +from typing import List, Callable +from swarms.structs.conversation import Conversation +from swarms.structs.multi_agent_exec import run_agents_concurrently +from swarms.utils.history_output_formatter import ( + history_output_formatter, + HistoryOutputType, +) + +from swarms.prompts.agent_conversation_aggregator import ( + AGGREGATOR_SYSTEM_PROMPT, +) + + +def aggregator_agent_task_prompt( + task: str, workers: List[Agent], conversation: Conversation +): + return f""" + Please analyze and summarize the following multi-agent conversation, following your guidelines for comprehensive synthesis: + + Conversation Context: + Original Task: {task} + Number of Participating Agents: {len(workers)} + + Conversation Content: + {conversation.get_str()} + + Please provide a 3,000 word comprehensive summary report of the conversation. + """ + + +def aggregate( + workers: List[Callable], + task: str = None, + type: HistoryOutputType = "all", + aggregator_model_name: str = "anthropic/claude-3-sonnet-20240229", +): + """ + Aggregate a list of tasks into a single task. + """ + + if task is None: + raise ValueError("Task is required in the aggregator block") + + if workers is None: + raise ValueError( + "Workers is required in the aggregator block" + ) + + if not isinstance(workers, list): + raise ValueError("Workers must be a list of Callable") + + if not all(isinstance(worker, Callable) for worker in workers): + raise ValueError("Workers must be a list of Callable") + + conversation = Conversation() + + aggregator_agent = Agent( + agent_name="Aggregator", + agent_description="Expert agent specializing in analyzing and synthesizing multi-agent conversations", + system_prompt=AGGREGATOR_SYSTEM_PROMPT, + max_loops=1, + model_name=aggregator_model_name, + output_type="final", + max_tokens=4000, + ) + + results = run_agents_concurrently(agents=workers, task=task) + + # Zip the results with the agents + for result, agent in zip(results, workers): + conversation.add(content=result, role=agent.agent_name) + + final_result = aggregator_agent.run( + task=aggregator_agent_task_prompt(task, workers, conversation) + ) + + conversation.add( + content=final_result, role=aggregator_agent.agent_name + ) + + return history_output_formatter( + conversation=conversation, type=type + ) diff --git a/swarms/structs/majority_voting.py b/swarms/structs/majority_voting.py index cba42331..09c12520 100644 --- a/swarms/structs/majority_voting.py +++ b/swarms/structs/majority_voting.py @@ -9,7 +9,7 @@ from typing import Any, Callable, List, Optional from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation from swarms.structs.multi_agent_exec import run_agents_concurrently -from swarms.structs.output_types import OutputType +from swarms.utils.output_types import OutputType from swarms.utils.formatter import formatter from swarms.utils.loguru_logger import initialize_logger diff --git a/swarms/structs/mixture_of_agents.py b/swarms/structs/mixture_of_agents.py index aa11cf62..9c6b8756 100644 --- a/swarms/structs/mixture_of_agents.py +++ b/swarms/structs/mixture_of_agents.py @@ -7,7 +7,7 @@ from swarms.structs.agent import Agent from swarms.prompts.ag_prompt import aggregator_system_prompt_main from swarms.utils.loguru_logger import initialize_logger import concurrent.futures -from swarms.structs.output_types import OutputType +from swarms.utils.output_types import OutputType from swarms.structs.conversation import Conversation from swarms.utils.history_output_formatter import ( history_output_formatter, diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index 9e75b730..93d363f8 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -1,3 +1,4 @@ +import concurrent.futures import asyncio import os import threading @@ -5,7 +6,7 @@ from concurrent.futures import ( ThreadPoolExecutor, ) from dataclasses import dataclass -from typing import Any, Callable, List, Union +from typing import Any, Callable, List, Optional, Union import psutil @@ -68,44 +69,42 @@ async def run_agents_concurrently_async( def run_agents_concurrently( agents: List[AgentType], task: str, - batch_size: int = None, - max_workers: int = None, + max_workers: Optional[int] = None, ) -> List[Any]: """ - Optimized concurrent agent runner using both uvloop and ThreadPoolExecutor. + Optimized concurrent agent runner using ThreadPoolExecutor. Args: agents: List of Agent instances to run concurrently task: Task string to execute - batch_size: Number of agents to run in parallel in each batch (defaults to CPU count) - max_workers: Maximum number of threads in the executor (defaults to CPU count * 2) + max_workers: Maximum number of threads in the executor (defaults to 95% of CPU cores) Returns: List of outputs from each agent """ - # Optimize defaults based on system resources - cpu_cores = os.cpu_count() - batch_size = batch_size or cpu_cores - max_workers = max_workers or cpu_cores * 2 + if max_workers is None: + # 95% of the available CPU cores + num_cores = os.cpu_count() + max_workers = int(num_cores * 0.95) if num_cores else 1 results = [] - # Get or create event loop - try: - loop = asyncio.get_event_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - # Create a shared thread pool executor with optimal worker count - with ThreadPoolExecutor(max_workers=max_workers) as executor: - # Process agents in batches - for i in range(0, len(agents), batch_size): - batch = agents[i : i + batch_size] - batch_results = loop.run_until_complete( - run_agents_concurrently_async(batch, task, executor) - ) - results.extend(batch_results) + with concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers + ) as executor: + # Submit all tasks and get futures + futures = [ + executor.submit(agent.run, task) for agent in agents + ] + + # Wait for all futures to complete and get results + for future in concurrent.futures.as_completed(futures): + try: + result = future.result() + results.append(result) + except Exception as e: + # Append the error if an agent fails + results.append(e) return results diff --git a/swarms/structs/multi_agent_router.py b/swarms/structs/multi_agent_router.py index 173e267e..d496a5f0 100644 --- a/swarms/structs/multi_agent_router.py +++ b/swarms/structs/multi_agent_router.py @@ -16,7 +16,7 @@ from pydantic import BaseModel, Field from swarms.utils.function_caller_model import OpenAIFunctionCaller from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation -from swarms.structs.output_types import OutputType +from swarms.utils.output_types import OutputType from swarms.utils.any_to_str import any_to_str from swarms.utils.history_output_formatter import ( history_output_formatter, diff --git a/swarms/structs/output_types.py b/swarms/structs/output_types.py deleted file mode 100644 index ca1376a6..00000000 --- a/swarms/structs/output_types.py +++ /dev/null @@ -1,6 +0,0 @@ -from swarms.utils.history_output_formatter import ( - HistoryOutputType as OutputType, -) - -# Use the OutputType for type annotations -output_type: OutputType # OutputType now includes 'xml' diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index 7dd5d649..b9936dcf 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -15,7 +15,7 @@ from swarms.utils.history_output_formatter import ( from swarms.utils.loguru_logger import initialize_logger from swarms.telemetry.main import log_agent_data from swarms.structs.conversation import Conversation -from swarms.structs.output_types import OutputType +from swarms.utils.output_types import OutputType from swarms.structs.multi_agent_exec import get_agents_info logger = initialize_logger(log_folder="rearrange") diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index 1f4ce76a..8eaf578a 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -2,7 +2,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Optional from swarms.structs.agent import Agent -from swarms.structs.output_types import OutputType +from swarms.utils.output_types import OutputType from swarms.structs.rearrange import AgentRearrange from swarms.utils.loguru_logger import initialize_logger diff --git a/swarms/structs/swarm_matcher.py b/swarms/structs/swarm_matcher.py index 37ccf2cb..6d32d8d2 100644 --- a/swarms/structs/swarm_matcher.py +++ b/swarms/structs/swarm_matcher.py @@ -89,7 +89,7 @@ class SwarmMatcher: Returns: np.ndarray: The embedding vector for the text. """ - import numpy as np + logger.debug(f"Getting embedding for text: {text[:50]}...") try: inputs = self.tokenizer( @@ -142,6 +142,7 @@ class SwarmMatcher: Tuple[str, float]: A tuple containing the name of the best matching swarm type and the score. """ import numpy as np + logger.debug(f"Finding best match for task: {task[:50]}...") try: task_embedding = self.get_embedding(task) diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index f623eba5..06307b90 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -20,7 +20,7 @@ from swarms.structs.rearrange import AgentRearrange from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm from swarms.structs.swarm_matcher import swarm_matcher -from swarms.structs.output_types import OutputType +from swarms.utils.output_types import OutputType from swarms.utils.loguru_logger import initialize_logger from swarms.structs.malt import MALT from swarms.structs.deep_research_swarm import DeepResearchSwarm diff --git a/swarms/structs/swarming_architectures.py b/swarms/structs/swarming_architectures.py index 21f5f02e..f34fc844 100644 --- a/swarms/structs/swarming_architectures.py +++ b/swarms/structs/swarming_architectures.py @@ -9,7 +9,7 @@ from swarms.structs.conversation import Conversation from swarms.utils.history_output_formatter import ( history_output_formatter, ) -from swarms.structs.output_types import OutputType +from swarms.utils.output_types import OutputType logger = initialize_logger(log_folder="swarming_architectures") diff --git a/swarms/utils/__init__.py b/swarms/utils/__init__.py index 6632b26b..b604186d 100644 --- a/swarms/utils/__init__.py +++ b/swarms/utils/__init__.py @@ -17,6 +17,10 @@ from swarms.utils.pdf_to_text import pdf_to_text from swarms.utils.try_except_wrapper import try_except_wrapper from swarms.utils.calculate_func_metrics import profile_func from swarms.utils.litellm_tokenizer import count_tokens +from swarms.utils.output_types import HistoryOutputType +from swarms.utils.history_output_formatter import ( + history_output_formatter, +) __all__ = [ @@ -35,4 +39,6 @@ __all__ = [ "try_except_wrapper", "profile_func", "count_tokens", + "HistoryOutputType", + "history_output_formatter", ] diff --git a/swarms/utils/generate_keys.py b/swarms/utils/generate_keys.py index c0fb3779..c92d2b2b 100644 --- a/swarms/utils/generate_keys.py +++ b/swarms/utils/generate_keys.py @@ -1,6 +1,5 @@ import secrets import string -import re def generate_api_key(prefix: str = "sk-", length: int = 32) -> str: @@ -36,29 +35,3 @@ def generate_api_key(prefix: str = "sk-", length: int = 32) -> str: api_key = f"{prefix}{random_part}" return api_key - - -def validate_api_key(api_key: str, prefix: str = "sk-") -> bool: - """ - Validate if an API key matches the expected format. - - Args: - api_key (str): The API key to validate - prefix (str): The expected prefix (default: "sk-") - - Returns: - bool: True if the API key is valid, False otherwise - """ - if not isinstance(api_key, str): - return False - - # Check if key starts with prefix - if not api_key.startswith(prefix): - return False - - # Check if the rest of the key contains only alphanumeric characters - random_part = api_key[len(prefix) :] - if not re.match(r"^[a-zA-Z0-9]+$", random_part): - return False - - return True diff --git a/swarms/utils/history_output_formatter.py b/swarms/utils/history_output_formatter.py index ea9d8d7f..b4b625b3 100644 --- a/swarms/utils/history_output_formatter.py +++ b/swarms/utils/history_output_formatter.py @@ -1,28 +1,11 @@ import yaml -from swarms.structs.conversation import Conversation -from typing import Literal, Union, List, Dict, Any +from typing import Union, List, Dict, Any from swarms.utils.xml_utils import to_xml_string - -HistoryOutputType = Literal[ - "list", - "dict", - "dictionary", - "string", - "str", - "final", - "last", - "json", - "all", - "yaml", - "xml", - # "dict-final", - "dict-all-except-first", - "str-all-except-first", -] +from swarms.utils.output_types import HistoryOutputType def history_output_formatter( - conversation: Conversation, type: HistoryOutputType = "list" + conversation: callable, type: HistoryOutputType = "list" ) -> Union[List[Dict[str, Any]], Dict[str, Any], str]: if type == "list": return conversation.return_messages_as_list() diff --git a/swarms/utils/output_types.py b/swarms/utils/output_types.py new file mode 100644 index 00000000..c09c4a6f --- /dev/null +++ b/swarms/utils/output_types.py @@ -0,0 +1,23 @@ +from typing import Literal + +HistoryOutputType = Literal[ + "list", + "dict", + "dictionary", + "string", + "str", + "final", + "last", + "json", + "all", + "yaml", + "xml", + # "dict-final", + "dict-all-except-first", + "str-all-except-first", + "basemodel", +] + +OutputType = HistoryOutputType + +output_type: HistoryOutputType # OutputType now includes 'xml' diff --git a/swarms/utils/vllm_wrapper.py b/swarms/utils/vllm_wrapper.py index 8949a050..863bd322 100644 --- a/swarms/utils/vllm_wrapper.py +++ b/swarms/utils/vllm_wrapper.py @@ -1,3 +1,5 @@ +import os +import concurrent.futures from typing import List, Optional, Dict, Any from loguru import logger @@ -131,16 +133,16 @@ class VLLMWrapper: Returns: List[str]: List of model responses. """ - logger.info( - f"Running tasks in batches of size {batch_size}. Total tasks: {len(tasks)}" - ) - results = [] - - for i in range(0, len(tasks), batch_size): - batch = tasks[i : i + batch_size] - for task in batch: - logger.info(f"Running task: {task}") - results.append(self.run(task)) - - logger.info("Completed all tasks.") - return results + # Fetch 95% of the available CPU cores + num_cores = os.cpu_count() + num_workers = int(num_cores * 0.95) + with concurrent.futures.ThreadPoolExecutor( + max_workers=num_workers + ) as executor: + futures = [ + executor.submit(self.run, task) for task in tasks + ] + return [ + future.result() + for future in concurrent.futures.as_completed(futures) + ] diff --git a/swarms/utils/wrapper_clusterop.py b/swarms/utils/wrapper_clusterop.py deleted file mode 100644 index 6119c532..00000000 --- a/swarms/utils/wrapper_clusterop.py +++ /dev/null @@ -1,127 +0,0 @@ -import platform -from typing import Any - - -from clusterops import ( - execute_on_gpu, - execute_on_multiple_gpus, - list_available_gpus, - execute_with_all_cpu_cores, - execute_on_cpu, -) -from swarms.utils.loguru_logger import initialize_logger - -logger = initialize_logger(log_folder="clusterops_wrapper") - - -def exec_callable_with_clusterops( - device: str = "cpu", - device_id: int = 1, - all_cores: bool = True, - all_gpus: bool = False, - func: callable = None, - enable_logging: bool = True, - *args, - **kwargs, -) -> Any: - """ - Executes a given function on a specified device, either CPU or GPU. - - This method attempts to execute a given function on a specified device, either CPU or GPU. It logs the device selection and the number of cores or GPU ID used. If the device is set to CPU, it can use all available cores or a specific core specified by `device_id`. If the device is set to GPU, it uses the GPU specified by `device_id`. - - Args: - device (str, optional): The device to use for execution. Defaults to "cpu". - device_id (int, optional): The ID of the GPU to use if device is set to "gpu". Defaults to 0. - all_cores (bool, optional): If True, uses all available CPU cores. Defaults to True. - all_gpus (bool, optional): If True, uses all available GPUs. Defaults to False. - func (callable): The function to execute. - enable_logging (bool, optional): If True, enables logging. Defaults to True. - *args: Additional positional arguments to be passed to the execution method. - **kwargs: Additional keyword arguments to be passed to the execution method. - - Returns: - Any: The result of the execution. - - Raises: - ValueError: If an invalid device is specified. - Exception: If any other error occurs during execution. - """ - if func is None: - raise ValueError("A callable function must be provided") - - try: - if enable_logging: - logger.info(f"Attempting to run on device: {device}") - device = device.lower() - - # Check if the platform is Windows and do nothing if true - if platform.system() == "Windows": - if enable_logging: - logger.info( - "Platform is Windows, not executing on device." - ) - return None - - if device == "cpu": - if enable_logging: - logger.info("Device set to CPU") - - if all_cores: - if enable_logging: - logger.info("Using all CPU cores") - return execute_with_all_cpu_cores( - func, *args, **kwargs - ) - - if device_id is not None: - if enable_logging: - logger.info( - f"Using specific CPU core: {device_id}" - ) - return execute_on_cpu( - device_id, func, *args, **kwargs - ) - - elif device == "gpu": - if enable_logging: - logger.info("Device set to GPU") - - if all_gpus: - if enable_logging: - logger.info("Using all available GPUs") - gpus = [int(gpu) for gpu in list_available_gpus()] - return execute_on_multiple_gpus( - gpus, func, *args, **kwargs - ) - - if enable_logging: - logger.info(f"Using GPU device ID: {device_id}") - return execute_on_gpu(device_id, func, *args, **kwargs) - - else: - raise ValueError( - f"Invalid device specified: {device}. Supported devices are 'cpu' and 'gpu'." - ) - - except ValueError as e: - if enable_logging: - logger.error( - f"Invalid device or configuration specified: {e}" - ) - raise - except Exception as e: - if enable_logging: - logger.error(f"An error occurred during execution: {e}") - raise - - -# def test_clusterops(x): -# return x + 1 - -# example = exec_callable_with_clusterops( -# device="cpu", -# all_cores=True, -# func = test_clusterops, -# ) - -# print(example) diff --git a/tests/structs/test_conversation.py b/tests/structs/test_conversation.py new file mode 100644 index 00000000..ba8d9d2b --- /dev/null +++ b/tests/structs/test_conversation.py @@ -0,0 +1,567 @@ +import os +import json +import time +import datetime +import yaml +from swarms.structs.conversation import ( + Conversation, + generate_conversation_id, +) + + +def run_all_tests(): + """Run all tests for the Conversation class""" + test_results = [] + + def run_test(test_func): + try: + test_func() + test_results.append(f"✅ {test_func.__name__} passed") + except Exception as e: + test_results.append( + f"❌ {test_func.__name__} failed: {str(e)}" + ) + + def test_basic_initialization(): + """Test basic initialization of Conversation""" + conv = Conversation() + assert conv.id is not None + assert conv.conversation_history is not None + assert isinstance(conv.conversation_history, list) + + # Test with custom ID + custom_id = generate_conversation_id() + conv_with_id = Conversation(id=custom_id) + assert conv_with_id.id == custom_id + + # Test with custom name + conv_with_name = Conversation(name="Test Conversation") + assert conv_with_name.name == "Test Conversation" + + def test_initialization_with_settings(): + """Test initialization with various settings""" + conv = Conversation( + system_prompt="Test system prompt", + time_enabled=True, + autosave=True, + token_count=True, + provider="in-memory", + context_length=4096, + rules="Test rules", + custom_rules_prompt="Custom rules", + user="TestUser:", + save_as_yaml=True, + save_as_json_bool=True, + ) + + # Test all settings + assert conv.system_prompt == "Test system prompt" + assert conv.time_enabled is True + assert conv.autosave is True + assert conv.token_count is True + assert conv.provider == "in-memory" + assert conv.context_length == 4096 + assert conv.rules == "Test rules" + assert conv.custom_rules_prompt == "Custom rules" + assert conv.user == "TestUser:" + assert conv.save_as_yaml is True + assert conv.save_as_json_bool is True + + def test_message_manipulation(): + """Test adding, deleting, and updating messages""" + conv = Conversation() + + # Test adding messages with different content types + conv.add("user", "Hello") # String content + conv.add("assistant", {"response": "Hi"}) # Dict content + conv.add("system", ["Hello", "Hi"]) # List content + + assert len(conv.conversation_history) == 3 + assert isinstance( + conv.conversation_history[1]["content"], dict + ) + assert isinstance( + conv.conversation_history[2]["content"], list + ) + + # Test adding multiple messages + conv.add_multiple( + ["user", "assistant", "system"], + ["Hi", "Hello there", "System message"], + ) + assert len(conv.conversation_history) == 6 + + # Test updating message with different content type + conv.update(0, "user", {"updated": "content"}) + assert isinstance( + conv.conversation_history[0]["content"], dict + ) + + # Test deleting multiple messages + conv.delete(0) + conv.delete(0) + assert len(conv.conversation_history) == 4 + + def test_message_retrieval(): + """Test message retrieval methods""" + conv = Conversation() + + # Add messages in specific order for testing + conv.add("user", "Test message") + conv.add("assistant", "Test response") + conv.add("system", "System message") + + # Test query - note: messages might have system prompt prepended + message = conv.query(0) + assert "Test message" in message["content"] + + # Test search with multiple results + results = conv.search("Test") + assert ( + len(results) >= 2 + ) # At least two messages should contain "Test" + assert any( + "Test message" in str(msg["content"]) for msg in results + ) + assert any( + "Test response" in str(msg["content"]) for msg in results + ) + + # Test get_last_message_as_string + last_message = conv.get_last_message_as_string() + assert "System message" in last_message + + # Test return_messages_as_list + messages_list = conv.return_messages_as_list() + assert ( + len(messages_list) >= 3 + ) # At least our 3 added messages + assert any("Test message" in msg for msg in messages_list) + + # Test return_messages_as_dictionary + messages_dict = conv.return_messages_as_dictionary() + assert ( + len(messages_dict) >= 3 + ) # At least our 3 added messages + assert all(isinstance(m, dict) for m in messages_dict) + assert all( + {"role", "content"} <= set(m.keys()) + for m in messages_dict + ) + + # Test get_final_message and content + assert "System message" in conv.get_final_message() + assert "System message" in conv.get_final_message_content() + + # Test return_all_except_first + remaining_messages = conv.return_all_except_first() + assert ( + len(remaining_messages) >= 2 + ) # At least 2 messages after removing first + + # Test return_all_except_first_string + remaining_string = conv.return_all_except_first_string() + assert isinstance(remaining_string, str) + + def test_saving_loading(): + """Test saving and loading conversation""" + # Test with save_enabled + conv = Conversation( + save_enabled=True, + conversations_dir="./test_conversations", + ) + conv.add("user", "Test save message") + + # Test save_as_json + test_file = os.path.join( + "./test_conversations", "test_conversation.json" + ) + conv.save_as_json(test_file) + assert os.path.exists(test_file) + + # Test load_from_json + new_conv = Conversation() + new_conv.load_from_json(test_file) + assert len(new_conv.conversation_history) == 1 + assert ( + new_conv.conversation_history[0]["content"] + == "Test save message" + ) + + # Test class method load_conversation + loaded_conv = Conversation.load_conversation( + name=conv.id, conversations_dir="./test_conversations" + ) + assert loaded_conv.id == conv.id + + # Cleanup + os.remove(test_file) + os.rmdir("./test_conversations") + + def test_output_formats(): + """Test different output formats""" + conv = Conversation() + conv.add("user", "Test message") + conv.add("assistant", {"response": "Test"}) + + # Test JSON output + json_output = conv.to_json() + assert isinstance(json_output, str) + parsed_json = json.loads(json_output) + assert len(parsed_json) == 2 + + # Test dict output + dict_output = conv.to_dict() + assert isinstance(dict_output, list) + assert len(dict_output) == 2 + + # Test YAML output + yaml_output = conv.to_yaml() + assert isinstance(yaml_output, str) + parsed_yaml = yaml.safe_load(yaml_output) + assert len(parsed_yaml) == 2 + + # Test return_json + json_str = conv.return_json() + assert isinstance(json_str, str) + assert len(json.loads(json_str)) == 2 + + def test_memory_management(): + """Test memory management functions""" + conv = Conversation() + + # Test clear + conv.add("user", "Test message") + conv.clear() + assert len(conv.conversation_history) == 0 + + # Test clear_memory + conv.add("user", "Test message") + conv.clear_memory() + assert len(conv.conversation_history) == 0 + + # Test batch operations + messages = [ + {"role": "user", "content": "Message 1"}, + {"role": "assistant", "content": "Response 1"}, + ] + conv.batch_add(messages) + assert len(conv.conversation_history) == 2 + + # Test truncate_memory_with_tokenizer + if conv.tokenizer: # Only if tokenizer is available + conv.truncate_memory_with_tokenizer() + assert len(conv.conversation_history) > 0 + + def test_conversation_metadata(): + """Test conversation metadata and listing""" + test_dir = "./test_conversations_metadata" + os.makedirs(test_dir, exist_ok=True) + + try: + # Create a conversation with metadata + conv = Conversation( + name="Test Conv", + system_prompt="System", + rules="Rules", + custom_rules_prompt="Custom", + conversations_dir=test_dir, + save_enabled=True, + autosave=True, + ) + + # Add a message to trigger save + conv.add("user", "Test message") + + # Give a small delay for autosave + time.sleep(0.1) + + # List conversations and verify + conversations = Conversation.list_conversations(test_dir) + assert len(conversations) >= 1 + found_conv = next( + ( + c + for c in conversations + if c["name"] == "Test Conv" + ), + None, + ) + assert found_conv is not None + assert found_conv["id"] == conv.id + + finally: + # Cleanup + import shutil + + if os.path.exists(test_dir): + shutil.rmtree(test_dir) + + def test_time_enabled_messages(): + """Test time-enabled messages""" + conv = Conversation(time_enabled=True) + conv.add("user", "Time test") + + # Verify timestamp in message + message = conv.conversation_history[0] + assert "timestamp" in message + assert isinstance(message["timestamp"], str) + + # Verify time in content when time_enabled is True + assert "Time:" in message["content"] + + def test_provider_specific(): + """Test provider-specific functionality""" + # Test in-memory provider + conv_memory = Conversation(provider="in-memory") + conv_memory.add("user", "Test") + assert len(conv_memory.conversation_history) == 1 + + # Test mem0 provider if available + try: + conv_mem0 = Conversation(provider="mem0") + conv_mem0.add("user", "Test") + # Add appropriate assertions based on mem0 behavior + except: + pass # Skip if mem0 is not available + + def test_tool_output(): + """Test tool output handling""" + conv = Conversation() + tool_output = { + "tool_name": "test_tool", + "output": "test result", + } + conv.add_tool_output_to_agent("tool", tool_output) + + assert len(conv.conversation_history) == 1 + assert conv.conversation_history[0]["role"] == "tool" + assert conv.conversation_history[0]["content"] == tool_output + + def test_autosave_functionality(): + """Test autosave functionality and related features""" + test_dir = "./test_conversations_autosave" + os.makedirs(test_dir, exist_ok=True) + + try: + # Test with autosave and save_enabled True + conv = Conversation( + autosave=True, + save_enabled=True, + conversations_dir=test_dir, + name="autosave_test", + ) + + # Add a message and verify it was auto-saved + conv.add("user", "Test autosave message") + save_path = os.path.join(test_dir, f"{conv.id}.json") + + # Give a small delay for autosave to complete + time.sleep(0.1) + + assert os.path.exists( + save_path + ), f"Save file not found at {save_path}" + + # Load the saved conversation and verify content + loaded_conv = Conversation.load_conversation( + name=conv.id, conversations_dir=test_dir + ) + found_message = False + for msg in loaded_conv.conversation_history: + if "Test autosave message" in str(msg["content"]): + found_message = True + break + assert ( + found_message + ), "Message not found in loaded conversation" + + # Clean up first conversation files + if os.path.exists(save_path): + os.remove(save_path) + + # Test with save_enabled=False + conv_no_save = Conversation( + autosave=False, # Changed to False to prevent autosave + save_enabled=False, + conversations_dir=test_dir, + ) + conv_no_save.add("user", "This shouldn't be saved") + save_path_no_save = os.path.join( + test_dir, f"{conv_no_save.id}.json" + ) + time.sleep(0.1) # Give time for potential save + assert not os.path.exists( + save_path_no_save + ), "File should not exist when save_enabled is False" + + finally: + # Cleanup + import shutil + + if os.path.exists(test_dir): + shutil.rmtree(test_dir) + + def test_advanced_message_handling(): + """Test advanced message handling features""" + conv = Conversation() + + # Test adding messages with metadata + metadata = {"timestamp": "2024-01-01", "session_id": "123"} + conv.add("user", "Test with metadata", metadata=metadata) + + # Test batch operations with different content types + messages = [ + {"role": "user", "content": "Message 1"}, + { + "role": "assistant", + "content": {"response": "Complex response"}, + }, + {"role": "system", "content": ["Multiple", "Items"]}, + ] + conv.batch_add(messages) + assert ( + len(conv.conversation_history) == 4 + ) # Including the first message + + # Test message format consistency + for msg in conv.conversation_history: + assert "role" in msg + assert "content" in msg + if "timestamp" in msg: + assert isinstance(msg["timestamp"], str) + + def test_conversation_metadata_handling(): + """Test handling of conversation metadata and attributes""" + test_dir = "./test_conversations_metadata_handling" + os.makedirs(test_dir, exist_ok=True) + + try: + # Test initialization with all optional parameters + conv = Conversation( + name="Test Conv", + system_prompt="System Prompt", + time_enabled=True, + context_length=2048, + rules="Test Rules", + custom_rules_prompt="Custom Rules", + user="CustomUser:", + provider="in-memory", + conversations_dir=test_dir, + save_enabled=True, + ) + + # Verify all attributes are set correctly + assert conv.name == "Test Conv" + assert conv.system_prompt == "System Prompt" + assert conv.time_enabled is True + assert conv.context_length == 2048 + assert conv.rules == "Test Rules" + assert conv.custom_rules_prompt == "Custom Rules" + assert conv.user == "CustomUser:" + assert conv.provider == "in-memory" + + # Test saving and loading preserves metadata + conv.save_as_json() + + # Load using load_conversation + loaded_conv = Conversation.load_conversation( + name=conv.id, conversations_dir=test_dir + ) + + # Verify metadata was preserved + assert loaded_conv.name == "Test Conv" + assert loaded_conv.system_prompt == "System Prompt" + assert loaded_conv.rules == "Test Rules" + + finally: + # Cleanup + import shutil + + shutil.rmtree(test_dir) + + def test_time_enabled_features(): + """Test time-enabled message features""" + conv = Conversation(time_enabled=True) + + # Add message and verify timestamp + conv.add("user", "Time test message") + message = conv.conversation_history[0] + + # Verify timestamp format + assert "timestamp" in message + try: + datetime.datetime.fromisoformat(message["timestamp"]) + except ValueError: + assert False, "Invalid timestamp format" + + # Verify time in content + assert "Time:" in message["content"] + assert ( + datetime.datetime.now().strftime("%Y-%m-%d") + in message["content"] + ) + + def test_provider_specific_features(): + """Test provider-specific features and behaviors""" + # Test in-memory provider + conv_memory = Conversation(provider="in-memory") + conv_memory.add("user", "Test in-memory") + assert len(conv_memory.conversation_history) == 1 + assert ( + "Test in-memory" + in conv_memory.get_last_message_as_string() + ) + + # Test mem0 provider if available + try: + from mem0 import AsyncMemory + + # Skip actual mem0 testing since it requires async + pass + except ImportError: + pass + + # Test invalid provider + invalid_provider = "invalid_provider" + try: + Conversation(provider=invalid_provider) + # If we get here, the provider was accepted when it shouldn't have been + raise AssertionError( + f"Should have raised ValueError for provider '{invalid_provider}'" + ) + except ValueError: + # This is the expected behavior + pass + + # Run all tests + tests = [ + test_basic_initialization, + test_initialization_with_settings, + test_message_manipulation, + test_message_retrieval, + test_saving_loading, + test_output_formats, + test_memory_management, + test_conversation_metadata, + test_time_enabled_messages, + test_provider_specific, + test_tool_output, + test_autosave_functionality, + test_advanced_message_handling, + test_conversation_metadata_handling, + test_time_enabled_features, + test_provider_specific_features, + ] + + for test in tests: + run_test(test) + + # Print results + print("\nTest Results:") + for result in test_results: + print(result) + + +if __name__ == "__main__": + run_all_tests() diff --git a/text_multi_agent_concurrency.py b/text_multi_agent_concurrency.py new file mode 100644 index 00000000..10efd469 --- /dev/null +++ b/text_multi_agent_concurrency.py @@ -0,0 +1,36 @@ +from swarms.structs.agent import Agent +from swarms.structs.ma_blocks import aggregate + + +agents = [ + Agent( + agent_name="Sector-Financial-Analyst", + agent_description="Senior financial analyst at BlackRock.", + system_prompt="You are a financial analyst tasked with optimizing asset allocations for a $50B portfolio. Provide clear, quantitative recommendations for each sector.", + max_loops=1, + model_name="gpt-4o-mini", + max_tokens=3000, + ), + Agent( + agent_name="Sector-Risk-Analyst", + agent_description="Expert risk management analyst.", + system_prompt="You are a risk analyst responsible for advising on risk allocation within a $50B portfolio. Provide detailed insights on risk exposures for each sector.", + max_loops=1, + model_name="gpt-4o-mini", + max_tokens=3000, + ), + Agent( + agent_name="Tech-Sector-Analyst", + agent_description="Technology sector analyst.", + system_prompt="You are a tech sector analyst focused on capital and risk allocations. Provide data-backed insights for the tech sector.", + max_loops=1, + model_name="gpt-4o-mini", + max_tokens=3000, + ), +] + +aggregate( + workers=agents, + task="What is the best sector to invest in?", + type="all", +)