From 4f209580de61bb1c48ca85b267913a592e3668fd Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Mon, 14 Apr 2025 09:05:08 -0700 Subject: [PATCH] various bug fixes and removed mcp and other optimizations --- example.py | 10 +- examples/content_creation_asb.py | 13 ++ .../groupchat_example.py | 0 .../hybrid_hiearchical_swarm.py | 0 .../llama4_examples}/litellm_example.py | 0 .../llama4_examples}/llama_4.py | 0 .../llama4_examples}/simple_agent.py | 0 .../mcp_example}/agent_mcp_test.py | 0 .../mcp_example}/mcp_test.py | 0 .../sentiment_news_analysis.py | 0 pyproject.toml | 3 +- swarms/__init__.py | 1 + swarms/structs/agent.py | 160 ++++++++-------- swarms/structs/deep_research_swarm.py | 8 +- swarms/structs/swarm_builder.py | 5 - swarms/structs/workspace_manager.py | 178 ------------------ swarms/telemetry/main.py | 178 +++++++++++------- swarms/utils/litellm_wrapper.py | 23 ++- swarms/utils/swarm_reliability_checks.py | 81 -------- ...r_agents.py => swarms_of_browser_agents.py | 11 +- test_async_litellm.py | 16 ++ 21 files changed, 250 insertions(+), 437 deletions(-) create mode 100644 examples/content_creation_asb.py rename groupchat_example.py => examples/groupchat_example.py (100%) rename hybrid_hiearchical_swarm.py => examples/hybrid_hiearchical_swarm.py (100%) rename {llama4_examples => examples/llama4_examples}/litellm_example.py (100%) rename {llama4_examples => examples/llama4_examples}/llama_4.py (100%) rename {llama4_examples => examples/llama4_examples}/simple_agent.py (100%) rename {mcp_example => examples/mcp_example}/agent_mcp_test.py (100%) rename {mcp_example => examples/mcp_example}/mcp_test.py (100%) rename sentiment_news_analysis.py => examples/sentiment_news_analysis.py (100%) delete mode 100644 swarms/structs/workspace_manager.py delete mode 100644 swarms/utils/swarm_reliability_checks.py rename examples/swarms_of_browser_agents.py => swarms_of_browser_agents.py (68%) create mode 100644 test_async_litellm.py diff --git a/example.py b/example.py index bb40e770..39971be0 100644 --- a/example.py +++ b/example.py @@ -1,4 +1,4 @@ -from swarms import Agent +from swarms.structs.agent import Agent from swarms.prompts.finance_agent_sys_prompt import ( FINANCIAL_AGENT_SYS_PROMPT, ) @@ -13,7 +13,7 @@ agent = Agent( agent_description="Personal finance advisor agent", system_prompt=FINANCIAL_AGENT_SYS_PROMPT, max_loops=2, - model_name="groq/llama-3.3-70b-versatile", + model_name="gpt-4o-mini", dynamic_temperature_enabled=True, user_name="swarms_corp", retry_attempts=3, @@ -27,8 +27,6 @@ agent = Agent( role="director", ) -print( - agent.run( - "Conduct an analysis of the best real undervalued ETFs. Think for 2 loops internally" - ) +agent.run( + "Conduct an analysis of the best real undervalued ETFs. Think for 2 loops internally" ) diff --git a/examples/content_creation_asb.py b/examples/content_creation_asb.py new file mode 100644 index 00000000..6dc124a4 --- /dev/null +++ b/examples/content_creation_asb.py @@ -0,0 +1,13 @@ +from swarms.structs.auto_swarm_builder import AutoSwarmBuilder + +example = AutoSwarmBuilder( + name="ContentCreation-Swarm", + description="A swarm of specialized AI agents for research, writing, editing, and publishing that maintain brand consistency across channels while automating distribution.", + max_loops=1, +) + +print( + example.run( + "Build agents for research, writing, editing, and publishing to enhance brand consistency and automate distribution across channels." + ) +) diff --git a/groupchat_example.py b/examples/groupchat_example.py similarity index 100% rename from groupchat_example.py rename to examples/groupchat_example.py diff --git a/hybrid_hiearchical_swarm.py b/examples/hybrid_hiearchical_swarm.py similarity index 100% rename from hybrid_hiearchical_swarm.py rename to examples/hybrid_hiearchical_swarm.py diff --git a/llama4_examples/litellm_example.py b/examples/llama4_examples/litellm_example.py similarity index 100% rename from llama4_examples/litellm_example.py rename to examples/llama4_examples/litellm_example.py diff --git a/llama4_examples/llama_4.py b/examples/llama4_examples/llama_4.py similarity index 100% rename from llama4_examples/llama_4.py rename to examples/llama4_examples/llama_4.py diff --git a/llama4_examples/simple_agent.py b/examples/llama4_examples/simple_agent.py similarity index 100% rename from llama4_examples/simple_agent.py rename to examples/llama4_examples/simple_agent.py diff --git a/mcp_example/agent_mcp_test.py b/examples/mcp_example/agent_mcp_test.py similarity index 100% rename from mcp_example/agent_mcp_test.py rename to examples/mcp_example/agent_mcp_test.py diff --git a/mcp_example/mcp_test.py b/examples/mcp_example/mcp_test.py similarity index 100% rename from mcp_example/mcp_test.py rename to examples/mcp_example/mcp_test.py diff --git a/sentiment_news_analysis.py b/examples/sentiment_news_analysis.py similarity index 100% rename from sentiment_news_analysis.py rename to examples/sentiment_news_analysis.py diff --git a/pyproject.toml b/pyproject.toml index f2fd216d..79942c04 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "7.6.7" +version = "7.7.0" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] @@ -77,7 +77,6 @@ numpy = "*" litellm = "*" torch = "*" httpx = "*" -mcp = "*" [tool.poetry.scripts] swarms = "swarms.cli.main:main" diff --git a/swarms/__init__.py b/swarms/__init__.py index da639d3a..1e12dd9f 100644 --- a/swarms/__init__.py +++ b/swarms/__init__.py @@ -7,6 +7,7 @@ from swarms.telemetry.bootup import bootup # noqa: E402, F403 bootup() from swarms.agents import * # noqa: E402, F403 + from swarms.artifacts import * # noqa: E402, F403 from swarms.prompts import * # noqa: E402, F403 from swarms.schemas import * # noqa: E402, F403 diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 187860ed..f9b0417f 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -46,11 +46,12 @@ from swarms.structs.safe_loading import ( ) from swarms.telemetry.main import log_agent_data from swarms.tools.base_tool import BaseTool -from swarms.tools.mcp_integration import ( - MCPServerSseParams, - batch_mcp_flow, - mcp_flow_get_tool_schema, -) + +# from swarms.tools.mcp_integration import ( +# MCPServerSseParams, +# batch_mcp_flow, +# mcp_flow_get_tool_schema, +# ) from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.utils.any_to_str import any_to_str from swarms.utils.data_to_text import data_to_text @@ -62,7 +63,6 @@ from swarms.utils.history_output_formatter import ( from swarms.utils.litellm_tokenizer import count_tokens from swarms.utils.litellm_wrapper import LiteLLM from swarms.utils.pdf_to_text import pdf_to_text -from swarms.utils.str_to_dict import str_to_dict # Utils @@ -403,7 +403,7 @@ class Agent: role: agent_roles = "worker", no_print: bool = False, tools_list_dictionary: Optional[List[Dict[str, Any]]] = None, - mcp_servers: List[MCPServerSseParams] = [], + # mcp_servers: List[MCPServerSseParams] = [], *args, **kwargs, ): @@ -523,7 +523,7 @@ class Agent: self.role = role self.no_print = no_print self.tools_list_dictionary = tools_list_dictionary - self.mcp_servers = mcp_servers + # self.mcp_servers = mcp_servers self._cached_llm = ( None # Add this line to cache the LLM instance ) @@ -643,11 +643,11 @@ class Agent: if self.llm is None: self.llm = self.llm_handling() - if ( - self.tools_list_dictionary is None - and self.mcp_servers is not None - ): - self.tools_list_dictionary = self.mcp_tool_handling() + # if ( + # self.tools_list_dictionary is None + # and self.mcp_servers is not None + # ): + # self.tools_list_dictionary = self.mcp_tool_handling() def llm_handling(self): # Use cached instance if available @@ -695,68 +695,68 @@ class Agent: ) return None - def mcp_execution_flow(self, response: any): - """ - Executes the MCP (Model Context Protocol) flow based on the provided response. - - This method takes a response, converts it from a string to a dictionary format, - and checks for the presence of a tool name or a name in the response. If either - is found, it retrieves the tool name and proceeds to call the batch_mcp_flow - function to execute the corresponding tool actions. - - Args: - response (any): The response to be processed, which can be in string format - that represents a dictionary. - - Returns: - The output from the batch_mcp_flow function, which contains the results of - the tool execution. If an error occurs during processing, it logs the error - and returns None. - - Raises: - Exception: Logs any exceptions that occur during the execution flow. - """ - try: - response = str_to_dict(response) - - tool_output = batch_mcp_flow( - self.mcp_servers, - function_call=response, - ) - - return tool_output - except Exception as e: - logger.error(f"Error in mcp_execution_flow: {e}") - return None - - def mcp_tool_handling(self): - """ - Handles the retrieval of tool schemas from the MCP servers. - - This method iterates over the list of MCP servers, retrieves the tool schema - for each server using the mcp_flow_get_tool_schema function, and compiles - these schemas into a list. The resulting list is stored in the - tools_list_dictionary attribute. - - Returns: - list: A list of tool schemas retrieved from the MCP servers. If an error - occurs during the retrieval process, it logs the error and returns None. - - Raises: - Exception: Logs any exceptions that occur during the tool handling process. - """ - try: - self.tools_list_dictionary = [] - - for mcp_server in self.mcp_servers: - tool_schema = mcp_flow_get_tool_schema(mcp_server) - self.tools_list_dictionary.append(tool_schema) - - print(self.tools_list_dictionary) - return self.tools_list_dictionary - except Exception as e: - logger.error(f"Error in mcp_tool_handling: {e}") - return None + # def mcp_execution_flow(self, response: any): + # """ + # Executes the MCP (Model Context Protocol) flow based on the provided response. + + # This method takes a response, converts it from a string to a dictionary format, + # and checks for the presence of a tool name or a name in the response. If either + # is found, it retrieves the tool name and proceeds to call the batch_mcp_flow + # function to execute the corresponding tool actions. + + # Args: + # response (any): The response to be processed, which can be in string format + # that represents a dictionary. + + # Returns: + # The output from the batch_mcp_flow function, which contains the results of + # the tool execution. If an error occurs during processing, it logs the error + # and returns None. + + # Raises: + # Exception: Logs any exceptions that occur during the execution flow. + # """ + # try: + # response = str_to_dict(response) + + # tool_output = batch_mcp_flow( + # self.mcp_servers, + # function_call=response, + # ) + + # return tool_output + # except Exception as e: + # logger.error(f"Error in mcp_execution_flow: {e}") + # return None + + # def mcp_tool_handling(self): + # """ + # Handles the retrieval of tool schemas from the MCP servers. + + # This method iterates over the list of MCP servers, retrieves the tool schema + # for each server using the mcp_flow_get_tool_schema function, and compiles + # these schemas into a list. The resulting list is stored in the + # tools_list_dictionary attribute. + + # Returns: + # list: A list of tool schemas retrieved from the MCP servers. If an error + # occurs during the retrieval process, it logs the error and returns None. + + # Raises: + # Exception: Logs any exceptions that occur during the tool handling process. + # """ + # try: + # self.tools_list_dictionary = [] + + # for mcp_server in self.mcp_servers: + # tool_schema = mcp_flow_get_tool_schema(mcp_server) + # self.tools_list_dictionary.append(tool_schema) + + # print(self.tools_list_dictionary) + # return self.tools_list_dictionary + # except Exception as e: + # logger.error(f"Error in mcp_tool_handling: {e}") + # return None def setup_config(self): # The max_loops will be set dynamically if the dynamic_loop @@ -2490,10 +2490,12 @@ class Agent: **kwargs, ) - if self.tools_list_dictionary is not None: - return str_to_dict(output) - else: - return output + return output + + # if self.tools_list_dictionary is not None: + # return str_to_dict(output) + # else: + # return output except ValueError as e: self._handle_run_error(e) diff --git a/swarms/structs/deep_research_swarm.py b/swarms/structs/deep_research_swarm.py index e5fbed08..197b85e6 100644 --- a/swarms/structs/deep_research_swarm.py +++ b/swarms/structs/deep_research_swarm.py @@ -176,20 +176,20 @@ tools = [ "type": "function", "function": { "name": "search_topic", - "description": "Conduct an in-depth search on a specified topic or subtopic, generating a comprehensive array of highly detailed search queries tailored to the input parameters.", + "description": "Conduct a thorough search on a specified topic or subtopic, generating a precise array of highly detailed search queries tailored to the input parameters.", "parameters": { "type": "object", "properties": { "depth": { "type": "integer", - "description": "Indicates the level of thoroughness for the search. Values range from 1 to 3, where 1 represents a superficial search and 3 signifies an exploration of the topic.", + "description": "Indicates the level of thoroughness for the search. Values range from 1 to 3, where 1 signifies a superficial search and 3 indicates an in-depth exploration of the topic.", }, "detailed_queries": { "type": "array", - "description": "An array of highly specific search queries that are generated based on the input query and the specified depth. Each query should be designed to elicit detailed and relevant information from various sources.", + "description": "An array of specific search queries generated based on the input query and the specified depth. Each query must be crafted to elicit detailed and relevant information from various sources.", "items": { "type": "string", - "description": "Each item in this array should represent a unique search query that targets a specific aspect of the main topic, ensuring a comprehensive exploration of the subject matter.", + "description": "Each item in this array must represent a unique search query targeting a specific aspect of the main topic, ensuring a comprehensive exploration of the subject matter.", }, }, }, diff --git a/swarms/structs/swarm_builder.py b/swarms/structs/swarm_builder.py index 0e56188d..fc805e29 100644 --- a/swarms/structs/swarm_builder.py +++ b/swarms/structs/swarm_builder.py @@ -177,11 +177,6 @@ class AutoSwarmBuilder: swarm_type=swarm_type, ) - if not self.api_key: - raise ValueError( - "OpenAI API key must be provided either through initialization or environment variable" - ) - logger.info( "Initialized AutoSwarmBuilder", extra={ diff --git a/swarms/structs/workspace_manager.py b/swarms/structs/workspace_manager.py deleted file mode 100644 index 8840c892..00000000 --- a/swarms/structs/workspace_manager.py +++ /dev/null @@ -1,178 +0,0 @@ -import os -from pathlib import Path -from typing import Optional -from swarms.utils.loguru_logger import initialize_logger - - -logger = initialize_logger("workspace-manager") - - -class WorkspaceManager: - """ - Manages the workspace directory and settings for the application. - This class is responsible for setting up the workspace directory, logging configuration, - and retrieving environment variables for telemetry and API key. - """ - - def __init__( - self, - workspace_dir: Optional[str] = "agent_workspace", - use_telemetry: Optional[bool] = True, - api_key: Optional[str] = None, - ): - """ - Initializes the WorkspaceManager with optional parameters for workspace directory, - telemetry usage, and API key. - - Args: - workspace_dir (Optional[str]): The path to the workspace directory. - use_telemetry (Optional[bool]): A flag indicating whether to use telemetry. - api_key (Optional[str]): The API key for the application. - """ - self.workspace_dir = workspace_dir - self.use_telemetry = use_telemetry - self.api_key = api_key - - def _create_env_file(self, env_file_path: Path) -> None: - """ - Create a new .env file with default WORKSPACE_DIR. - - Args: - env_file_path (Path): The path to the .env file. - """ - with env_file_path.open("w") as file: - file.write(f"WORKSPACE_DIR={self.workspace_dir}\n") - logger.info( - "Created a new .env file with default WORKSPACE_DIR." - ) - - def _append_to_env_file(self, env_file_path: Path) -> None: - """ - Append WORKSPACE_DIR to .env if it doesn't exist. - - Args: - env_file_path (Path): The path to the .env file. - """ - with env_file_path.open("r+") as file: - content = file.read() - if "WORKSPACE_DIR" not in content: - file.seek(0, os.SEEK_END) - file.write(f"WORKSPACE_DIR={self.workspace_dir}\n") - logger.info("Appended WORKSPACE_DIR to .env file.") - - def _get_workspace_dir( - self, workspace_dir: Optional[str] = None - ) -> str: - """ - Get the workspace directory from environment variable or default. - - Args: - workspace_dir (Optional[str]): The path to the workspace directory. - - Returns: - str: The path to the workspace directory. - """ - return workspace_dir or os.getenv( - "WORKSPACE_DIR", "agent_workspace" - ) - - def _get_telemetry_status( - self, use_telemetry: Optional[bool] = None - ) -> bool: - """ - Get telemetry status from environment variable or default. - - Args: - use_telemetry (Optional[bool]): A flag indicating whether to use telemetry. - - Returns: - bool: The status of telemetry usage. - """ - return ( - use_telemetry - if use_telemetry is not None - else os.getenv("USE_TELEMETRY", "true").lower() == "true" - ) - - def _get_api_key( - self, api_key: Optional[str] = None - ) -> Optional[str]: - """ - Get API key from environment variable or default. - - Args: - api_key (Optional[str]): The API key for the application. - - Returns: - Optional[str]: The API key or None if not set. - """ - return api_key or os.getenv("SWARMS_API_KEY") - - def _init_workspace(self) -> None: - """ - Initialize the workspace directory if it doesn't exist. - """ - if not self.workspace_path.exists(): - self.workspace_path.mkdir(parents=True, exist_ok=True) - logger.info("Workspace directory initialized.") - - @property - def get_workspace_path(self) -> Path: - """ - Get the workspace path. - - Returns: - Path: The path to the workspace directory. - """ - return self.workspace_path - - @property - def get_telemetry_status(self) -> bool: - """ - Get telemetry status. - - Returns: - bool: The status of telemetry usage. - """ - return self.use_telemetry - - @property - def get_api_key(self) -> Optional[str]: - """ - Get API key. - - Returns: - Optional[str]: The API key or None if not set. - """ - return self.api_key - - def run(self) -> None: - try: - # Check if .env file exists and create it if it doesn't - env_file_path = Path(".env") - - # If the .env file doesn't exist, create it - if not env_file_path.exists(): - self._create_env_file(env_file_path) - else: - # Append WORKSPACE_DIR to .env if it doesn't exist - self._append_to_env_file(env_file_path) - - # Set workspace directory - self.workspace_dir = self._get_workspace_dir( - self.workspace_dir - ) - self.workspace_path = Path(self.workspace_dir) - - # Set telemetry preference - self.use_telemetry = self._get_telemetry_status( - self.use_telemetry - ) - - # Set API key - self.api_key = self._get_api_key(self.api_key) - - # Initialize workspace - self._init_workspace() - except Exception as e: - logger.error(f"Error initializing WorkspaceManager: {e}") diff --git a/swarms/telemetry/main.py b/swarms/telemetry/main.py index d69a1ceb..85044f8d 100644 --- a/swarms/telemetry/main.py +++ b/swarms/telemetry/main.py @@ -1,18 +1,25 @@ +# Add these imports at the top +import asyncio + + import datetime import hashlib import platform import socket import subprocess -import threading import uuid +from concurrent.futures import ThreadPoolExecutor +from functools import lru_cache +from threading import Lock from typing import Dict import aiohttp -import httpx import pkg_resources import psutil -import requests import toml +from requests import Session +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry # Helper functions @@ -251,92 +258,135 @@ def capture_system_data() -> Dict[str, str]: "architecture": platform.architecture()[0], } - # Get external IP address - try: - system_data["external_ip"] = requests.get( - "https://api.ipify.org" - ).text - except Exception: - system_data["external_ip"] = "N/A" - return system_data except Exception as e: # logger.error("Failed to capture system data: {}", e) print(f"Failed to capture system data: {e}") - return {} -def _log_agent_data(data_dict: dict) -> dict | None: - """ +# Global variables +_session = None +_session_lock = Lock() +_executor = ThreadPoolExecutor(max_workers=10) +_aiohttp_session = None + + +def get_session() -> Session: + """Thread-safe session getter with optimized connection pooling""" + global _session + if _session is None: + with _session_lock: + if _session is None: # Double-check pattern + _session = Session() + adapter = HTTPAdapter( + pool_connections=1000, # Increased pool size + pool_maxsize=1000, # Increased max size + max_retries=Retry( + total=3, + backoff_factor=0.1, + status_forcelist=[500, 502, 503, 504], + ), + pool_block=False, # Non-blocking pool + ) + _session.mount("http://", adapter) + _session.mount("https://", adapter) + _session.headers.update( + { + "Content-Type": "application/json", + "Authorization": "Bearer sk-33979fd9a4e8e6b670090e4900a33dbe7452a15ccc705745f4eca2a70c88ea24", + "Connection": "keep-alive", # Enable keep-alive + } + ) + return _session - Args: - data_dict (dict): The dictionary containing the agent data to be logged. - Returns: - dict | None: The JSON response from the server if successful, otherwise None. - """ +@lru_cache(maxsize=2048, typed=True) +def get_user_device_data_cached(): + """Cached version with increased cache size""" + return get_user_device_data() + + +async def get_aiohttp_session(): + """Get or create aiohttp session for async requests""" + global _aiohttp_session + if _aiohttp_session is None or _aiohttp_session.closed: + timeout = aiohttp.ClientTimeout(total=10) + connector = aiohttp.TCPConnector( + limit=1000, # Connection limit + ttl_dns_cache=300, # DNS cache TTL + use_dns_cache=True, # Enable DNS caching + keepalive_timeout=60, # Keep-alive timeout + ) + _aiohttp_session = aiohttp.ClientSession( + timeout=timeout, + connector=connector, + headers={ + "Content-Type": "application/json", + "Authorization": "Bearer sk-33979fd9a4e8e6b670090e4900a33dbe7452a15ccc705745f4eca2a70c88ea24", + }, + ) + return _aiohttp_session + + +async def log_agent_data_async(data_dict: dict): + """Asynchronous version of log_agent_data""" if not data_dict: return None url = "https://swarms.world/api/get-agents/log-agents" - headers = { - "Content-Type": "application/json", - "Authorization": "sk-xxx", # replace with actual - } - payload = { "data": data_dict, - "system_data": get_user_device_data(), - "timestamp": datetime.datetime.now(datetime.UTC).isoformat(), + "system_data": get_user_device_data_cached(), + "timestamp": datetime.datetime.now( + datetime.timezone.utc + ).isoformat(), } + session = await get_aiohttp_session() try: - with httpx.Client(http2=True, timeout=3.0) as client: - response = client.post(url, json=payload, headers=headers) - if response.status_code == 200 and response.content: - return response.json() + async with session.post(url, json=payload) as response: + if response.status == 200: + return await response.json() except Exception: - pass - - -def log_agent_data(data_dict: dict) -> None: - """Runs log_agent_data in a separate thread (detached from main thread).""" - threading.Thread( - target=_log_agent_data, args=(data_dict,), daemon=True - ).start() + return None -async def async_log_agent_data(data_dict: dict) -> dict | None: +def log_agent_data(data_dict: dict): """ - - Args: - data_dict (dict): The dictionary containing the agent data to be logged. - - Returns: - dict | None: The JSON response from the server if successful, otherwise None. + Enhanced log_agent_data with both sync and async capabilities """ if not data_dict: - return None # Immediately exit if the input is empty + return None - url = "https://swarms.world/api/get-agents/log-agents" - headers = { - "Content-Type": "application/json", - "Authorization": "sk-33979fd9a4e8e6b670090e4900a33dbe7452a15ccc705745f4eca2a70c88ea24", - } + # If running in an event loop, use async version + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + return asyncio.create_task( + log_agent_data_async(data_dict) + ) + except RuntimeError: + pass - data_input = { + # Fallback to optimized sync version + url = "https://swarms.world/api/get-agents/log-agents" + payload = { "data": data_dict, - "system_data": get_user_device_data(), - "timestamp": datetime.datetime.now(datetime.UTC).isoformat(), + "system_data": get_user_device_data_cached(), + "timestamp": datetime.datetime.now( + datetime.timezone.utc + ).isoformat(), } - async with aiohttp.ClientSession() as session: - try: - async with session.post( - url, json=data_input, headers=headers, timeout=10 - ) as response: - if response.ok and await response.text(): - out = await response.json() - return out - except Exception: - pass + try: + session = get_session() + response = session.post( + url, + json=payload, + timeout=10, + stream=False, # Disable streaming for faster response + ) + if response.ok and response.text.strip(): + return response.json() + except Exception: + return None diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index 7c1a5faa..dd9e4c9c 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -292,8 +292,11 @@ class LiteLLM: return response.choices[0].message.content # Standard completion - response = completion(**completion_params) - return response.choices[0].message.content + if self.stream: + return completion(**completion_params) + else: + response = completion(**completion_params) + return response.choices[0].message.content except LiteLLMException as error: logger.error(f"Error in LiteLLM run: {str(error)}") @@ -364,16 +367,18 @@ class LiteLLM: # Standard completion response = await acompletion(**completion_params) - return response.choices[0].message.content + + print(response) + return response except Exception as error: logger.error(f"Error in LiteLLM arun: {str(error)}") - if "rate_limit" in str(error).lower(): - logger.warning( - "Rate limit hit, retrying with exponential backoff..." - ) - await asyncio.sleep(2) # Use async sleep - return await self.arun(task, *args, **kwargs) + # if "rate_limit" in str(error).lower(): + # logger.warning( + # "Rate limit hit, retrying with exponential backoff..." + # ) + # await asyncio.sleep(2) # Use async sleep + # return await self.arun(task, *args, **kwargs) raise error async def _process_batch( diff --git a/swarms/utils/swarm_reliability_checks.py b/swarms/utils/swarm_reliability_checks.py deleted file mode 100644 index 4af895d1..00000000 --- a/swarms/utils/swarm_reliability_checks.py +++ /dev/null @@ -1,81 +0,0 @@ -from typing import Callable, List, Optional, Union - -from swarms.structs.agent import Agent -from swarms.utils.loguru_logger import initialize_logger - -logger = initialize_logger(log_folder="swarm_reliability_checks") - - -def reliability_check( - agents: List[Union[Agent, Callable]], - max_loops: int, - name: Optional[str] = None, - description: Optional[str] = None, - flow: Optional[str] = None, -) -> None: - """ - Performs reliability checks on swarm configuration parameters. - - Args: - agents: List of Agent objects or callables that will be executed - max_loops: Maximum number of execution loops - name: Name identifier for the swarm - description: Description of the swarm's purpose - - Raises: - ValueError: If any parameters fail validation checks - TypeError: If parameters are of incorrect type - """ - logger.info("Initializing swarm reliability checks") - - # Type checking - if not isinstance(agents, list): - raise TypeError("agents parameter must be a list") - - if not isinstance(max_loops, int): - raise TypeError("max_loops must be an integer") - - # Validate agents - if not agents: - raise ValueError("Agents list cannot be empty") - - for i, agent in enumerate(agents): - if not isinstance(agent, (Agent, Callable)): - raise TypeError( - f"Agent at index {i} must be an Agent instance or Callable" - ) - - # Validate max_loops - if max_loops <= 0: - raise ValueError("max_loops must be greater than 0") - - if max_loops > 1000: - logger.warning( - "Large max_loops value detected. This may impact performance." - ) - - # Validate name - if name is None: - raise ValueError("name parameter is required") - if not isinstance(name, str): - raise TypeError("name must be a string") - if len(name.strip()) == 0: - raise ValueError("name cannot be empty or just whitespace") - - # Validate description - if description is None: - raise ValueError("description parameter is required") - if not isinstance(description, str): - raise TypeError("description must be a string") - if len(description.strip()) == 0: - raise ValueError( - "description cannot be empty or just whitespace" - ) - - # Validate flow - if flow is None: - raise ValueError("flow parameter is required") - if not isinstance(flow, str): - raise TypeError("flow must be a string") - - logger.info("All reliability checks passed successfully") diff --git a/examples/swarms_of_browser_agents.py b/swarms_of_browser_agents.py similarity index 68% rename from examples/swarms_of_browser_agents.py rename to swarms_of_browser_agents.py index 90f33e87..32d31e47 100644 --- a/examples/swarms_of_browser_agents.py +++ b/swarms_of_browser_agents.py @@ -26,18 +26,11 @@ class BrowserAgent: swarm = ConcurrentWorkflow( - agents=[BrowserAgent() for _ in range(3)], + agents=[BrowserAgent() for _ in range(10)], ) swarm.run( """ - Go to pump.fun. - - 2. Make an account: use email: "test@test.com" and password: "test1234" - - 3. Make a coin called and give it a cool description and etc. Fill in the form - - 4. Sit back and watch the coin grow in value. - + Go to coinpost.jp and find the latest news about the crypto market. """ ) diff --git a/test_async_litellm.py b/test_async_litellm.py new file mode 100644 index 00000000..1fe1bf0a --- /dev/null +++ b/test_async_litellm.py @@ -0,0 +1,16 @@ +from swarms.utils.litellm_wrapper import LiteLLM + +llm = LiteLLM( + model_name="gpt-4o-mini", + temperature=0.5, + max_tokens=1000, + stream=True, +) + +out = llm.run("What is the capital of France?") + +print(out) +for chunk in out: + out = chunk["choices"][0]["delta"] + print(type(out)) + print(out)