diff --git a/async_agents.py b/async_agents.py new file mode 100644 index 00000000..0d9353db --- /dev/null +++ b/async_agents.py @@ -0,0 +1,56 @@ +import os + +from dotenv import load_dotenv +from swarm_models import OpenAIChat + +from swarms import Agent +from swarms.prompts.finance_agent_sys_prompt import ( + FINANCIAL_AGENT_SYS_PROMPT, +) +from async_executor import HighSpeedExecutor + +load_dotenv() + +# Get the OpenAI API key from the environment variable +api_key = os.getenv("OPENAI_API_KEY") + +# Create an instance of the OpenAIChat class +model = OpenAIChat( + openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1 +) + +# Initialize the agent +agent = Agent( + agent_name="Financial-Analysis-Agent", + system_prompt=FINANCIAL_AGENT_SYS_PROMPT, + llm=model, + max_loops=1, + # autosave=True, + # dashboard=False, + # verbose=True, + # dynamic_temperature_enabled=True, + # saved_state_path="finance_agent.json", + # user_name="swarms_corp", + # retry_attempts=1, + # context_length=200000, + # return_step_meta=True, + # output_type="json", # "json", "dict", "csv" OR "string" soon "yaml" and + # auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task + # # artifacts_on=True, + # artifacts_output_path="roth_ira_report", + # artifacts_file_extension=".txt", + # max_tokens=8000, + # return_history=True, +) + + +def execute_agent( + task: str = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria. Create a report on this question.", +): + return agent.run(task) + + +executor = HighSpeedExecutor() +results = executor.run(execute_agent, 2) + +print(results) diff --git a/async_executor.py b/async_executor.py new file mode 100644 index 00000000..e9fcfa4e --- /dev/null +++ b/async_executor.py @@ -0,0 +1,131 @@ +import asyncio +import multiprocessing as mp +import time +from functools import partial +from typing import Any, Dict, Union + + +class HighSpeedExecutor: + def __init__(self, num_processes: int = None): + """ + Initialize the executor with configurable number of processes. + If num_processes is None, it uses CPU count. + """ + self.num_processes = num_processes or mp.cpu_count() + + async def _worker( + self, + queue: asyncio.Queue, + func: Any, + *args: Any, + **kwargs: Any, + ): + """Async worker that processes tasks from the queue""" + while True: + try: + # Non-blocking get from queue + await queue.get() + await asyncio.get_event_loop().run_in_executor( + None, partial(func, *args, **kwargs) + ) + queue.task_done() + except asyncio.CancelledError: + break + + async def _distribute_tasks( + self, num_tasks: int, queue: asyncio.Queue + ): + """Distribute tasks across the queue""" + for i in range(num_tasks): + await queue.put(i) + + async def execute_batch( + self, + func: Any, + num_executions: int, + *args: Any, + **kwargs: Any, + ) -> Dict[str, Union[int, float]]: + """ + Execute the given function multiple times concurrently. + + Args: + func: The function to execute + num_executions: Number of times to execute the function + *args, **kwargs: Arguments to pass to the function + + Returns: + A dictionary containing the number of executions, duration, and executions per second. + """ + queue = asyncio.Queue() + + # Create worker tasks + workers = [ + asyncio.create_task( + self._worker(queue, func, *args, **kwargs) + ) + for _ in range(self.num_processes) + ] + + # Start timing + start_time = time.perf_counter() + + # Distribute tasks + await self._distribute_tasks(num_executions, queue) + + # Wait for all tasks to complete + await queue.join() + + # Cancel workers + for worker in workers: + worker.cancel() + + # Wait for all workers to finish + await asyncio.gather(*workers, return_exceptions=True) + + end_time = time.perf_counter() + duration = end_time - start_time + + return { + "executions": num_executions, + "duration": duration, + "executions_per_second": num_executions / duration, + } + + def run( + self, + func: Any, + num_executions: int, + *args: Any, + **kwargs: Any, + ): + return asyncio.run( + self.execute_batch(func, num_executions, *args, **kwargs) + ) + + +# def example_function(x: int = 0) -> int: +# """Example function to execute""" +# return x * x + + +# async def main(): +# # Create executor with number of CPU cores +# executor = HighSpeedExecutor() + +# # Execute the function 1000 times +# result = await executor.execute_batch( +# example_function, num_executions=1000, x=42 +# ) + +# print( +# f"Completed {result['executions']} executions in {result['duration']:.2f} seconds" +# ) +# print( +# f"Rate: {result['executions_per_second']:.2f} executions/second" +# ) + + +# if __name__ == "__main__": +# # Run the async main function +# asyncio.run(main()) diff --git a/groupchat_new.py b/groupchat_new.py new file mode 100644 index 00000000..69c424d4 --- /dev/null +++ b/groupchat_new.py @@ -0,0 +1,244 @@ +import os +import asyncio +from pydantic import BaseModel, Field +from typing import List, Dict, Any +from swarms import Agent +from swarm_models import OpenAIChat +from dotenv import load_dotenv +from swarms.utils.formatter import formatter + +# Load environment variables +load_dotenv() + +# Get OpenAI API key +api_key = os.getenv("OPENAI_API_KEY") + + +# Define Pydantic schema for agent outputs +class AgentOutput(BaseModel): + """Schema for capturing the output of each agent.""" + + agent_name: str = Field(..., description="The name of the agent") + message: str = Field( + ..., + description="The agent's response or contribution to the group chat", + ) + metadata: Dict[str, Any] = Field( + default_factory=dict, + description="Additional metadata about the agent's response", + ) + + +class GroupChat: + """ + GroupChat class to enable multiple agents to communicate in an asynchronous group chat. + Each agent is aware of all other agents, every message exchanged, and the social context. + """ + + def __init__( + self, + name: str, + description: str, + agents: List[Agent], + max_loops: int = 1, + ): + """ + Initialize the GroupChat. + + Args: + name (str): Name of the group chat. + description (str): Description of the purpose of the group chat. + agents (List[Agent]): A list of agents participating in the chat. + max_loops (int): Maximum number of loops to run through all agents. + """ + self.name = name + self.description = description + self.agents = agents + self.max_loops = max_loops + self.chat_history = ( + [] + ) # Stores all messages exchanged in the chat + + formatter.print_panel( + f"Initialized GroupChat '{self.name}' with {len(self.agents)} agents. Max loops: {self.max_loops}", + title="Groupchat Swarm", + ) + + async def _agent_conversation( + self, agent: Agent, input_message: str + ) -> AgentOutput: + """ + Facilitate a single agent's response to the chat. + + Args: + agent (Agent): The agent responding. + input_message (str): The message triggering the response. + + Returns: + AgentOutput: The agent's response captured in a structured format. + """ + formatter.print_panel( + f"Agent '{agent.agent_name}' is responding to the message: {input_message}", + title="Groupchat Swarm", + ) + response = await asyncio.to_thread(agent.run, input_message) + + output = AgentOutput( + agent_name=agent.agent_name, + message=response, + metadata={"context_length": agent.context_length}, + ) + # logger.debug(f"Agent '{agent.agent_name}' response: {response}") + return output + + async def _run(self, initial_message: str) -> List[AgentOutput]: + """ + Execute the group chat asynchronously, looping through all agents up to max_loops. + + Args: + initial_message (str): The initial message to start the chat. + + Returns: + List[AgentOutput]: The responses of all agents across all loops. + """ + formatter.print_panel( + f"Starting group chat '{self.name}' with initial message: {initial_message}", + title="Groupchat Swarm", + ) + self.chat_history.append( + {"sender": "System", "message": initial_message} + ) + + outputs = [] + for loop in range(self.max_loops): + formatter.print_panel( + f"Group chat loop {loop + 1}/{self.max_loops}", + title="Groupchat Swarm", + ) + + for agent in self.agents: + # Create a custom input message for each agent, sharing the chat history and social context + input_message = ( + f"Chat History:\n{self._format_chat_history()}\n\n" + f"Participants:\n" + + "\n".join( + [ + f"- {a.agent_name}: {a.system_prompt}" + for a in self.agents + ] + ) + + f"\n\nNew Message: {initial_message}\n\n" + f"You are '{agent.agent_name}'. Remember to keep track of the social context, who is speaking, " + f"and respond accordingly based on your role: {agent.system_prompt}." + ) + + # Collect agent's response + output = await self._agent_conversation( + agent, input_message + ) + outputs.append(output) + + # Update chat history with the agent's response + self.chat_history.append( + { + "sender": agent.agent_name, + "message": output.message, + } + ) + + formatter.print_panel( + "Group chat completed. All agent responses captured.", + title="Groupchat Swarm", + ) + return outputs + + def run(self, task: str, *args, **kwargs): + return asyncio.run(self.run(task, *args, **kwargs)) + + def _format_chat_history(self) -> str: + """ + Format the chat history for agents to understand the context. + + Returns: + str: The formatted chat history as a string. + """ + return "\n".join( + [ + f"{entry['sender']}: {entry['message']}" + for entry in self.chat_history + ] + ) + + def __str__(self) -> str: + """String representation of the group chat's outputs.""" + return self._format_chat_history() + + def to_json(self) -> str: + """JSON representation of the group chat's outputs.""" + return [ + {"sender": entry["sender"], "message": entry["message"]} + for entry in self.chat_history + ] + + +# Example Usage +if __name__ == "__main__": + + load_dotenv() + + # Get the OpenAI API key from the environment variable + api_key = os.getenv("OPENAI_API_KEY") + + # Create an instance of the OpenAIChat class + model = OpenAIChat( + openai_api_key=api_key, + model_name="gpt-4o-mini", + temperature=0.1, + ) + + # Example agents + agent1 = Agent( + agent_name="Financial-Analysis-Agent", + system_prompt="You are a financial analyst specializing in investment strategies.", + llm=model, + max_loops=1, + autosave=False, + dashboard=False, + verbose=True, + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + output_type="string", + streaming_on=False, + ) + + agent2 = Agent( + agent_name="Tax-Adviser-Agent", + system_prompt="You are a tax adviser who provides clear and concise guidance on tax-related queries.", + llm=model, + max_loops=1, + autosave=False, + dashboard=False, + verbose=True, + dynamic_temperature_enabled=True, + user_name="swarms_corp", + retry_attempts=1, + context_length=200000, + output_type="string", + streaming_on=False, + ) + + # Create group chat + group_chat = GroupChat( + name="Financial Discussion", + description="A group chat for financial analysis and tax advice.", + agents=[agent1, agent2], + ) + + # Run the group chat + asyncio.run( + group_chat.run( + "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria? What do you guys think?" + ) + ) diff --git a/example_async_vs_multithread.py b/new_features_examples/example_async_vs_multithread.py similarity index 98% rename from example_async_vs_multithread.py rename to new_features_examples/example_async_vs_multithread.py index f547abc8..25d514aa 100644 --- a/example_async_vs_multithread.py +++ b/new_features_examples/example_async_vs_multithread.py @@ -1,6 +1,5 @@ import os import asyncio -import threading from swarms import Agent from swarm_models import OpenAIChat import time @@ -40,18 +39,21 @@ agent = Agent( streaming_on=False, ) + # Function to measure time and memory usage def measure_time_and_memory(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() - memory_usage = psutil.Process().memory_info().rss / 1024 ** 2 + memory_usage = psutil.Process().memory_info().rss / 1024**2 print(f"Time taken: {end_time - start_time} seconds") print(f"Memory used: {memory_usage} MB") return result + return wrapper + # Function to run the agent asynchronously @measure_time_and_memory async def run_agent_async(): @@ -61,11 +63,13 @@ async def run_agent_async(): ) ) + # Function to run the agent on another thread @measure_time_and_memory def run_agent_thread(): asyncio.run(run_agent_async()) + # Run the agent asynchronously and on another thread to test the speed asyncio.run(run_agent_async()) run_agent_thread() diff --git a/sequential_worflow_test.py b/new_features_examples/sequential_worflow_test.py similarity index 100% rename from sequential_worflow_test.py rename to new_features_examples/sequential_worflow_test.py diff --git a/sequential_workflow.py b/new_features_examples/sequential_workflow.py similarity index 100% rename from sequential_workflow.py rename to new_features_examples/sequential_workflow.py diff --git a/pyproject.toml b/pyproject.toml index d8d06c61..bb3d5043 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "6.2.9" +version = "6.3.6" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] @@ -37,6 +37,14 @@ keywords = [ "Generative AI", "Agent Marketplace", "Agent Store", + "quant", + "finance", + "algorithmic trading", + "portfolio optimization", + "risk management", + "financial modeling", + "machine learning for finance", + "natural language processing for finance", ] classifiers = [ "Development Status :: 4 - Beta", @@ -52,27 +60,18 @@ python = ">=3.10,<4.0" torch = ">=2.1.1,<3.0" transformers = ">= 4.39.0, <5.0.0" asyncio = ">=3.4.3,<4.0" -langchain-community = "0.0.29" -langchain-experimental = "0.0.55" -backoff = "2.2.1" toml = "*" pypdf = "4.3.1" -loguru = "0.7.2" +loguru = "*" pydantic = "2.8.2" -tenacity = "8.5.0" -Pillow = "10.4.0" +tenacity = "*" psutil = "*" sentry-sdk = {version = "*", extras = ["http"]} # Updated here python-dotenv = "*" PyYAML = "*" docstring_parser = "0.16" -fastapi = "*" -openai = ">=1.30.1,<2.0" -termcolor = "*" tiktoken = "*" networkx = "*" -swarms-memory = "*" -black = "*" aiofiles = "*" swarm-models = "*" clusterops = "*" @@ -96,9 +95,7 @@ mypy-protobuf = "^3.0.0" [tool.poetry.group.test.dependencies] pytest = "^8.1.1" -termcolor = "^2.4.0" pandas = "^2.2.2" -fastapi = ">=0.110.1,<0.116.0" [tool.ruff] line-length = 70 diff --git a/requirements.txt b/requirements.txt index d422222b..ca9fdcdd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,21 +2,16 @@ torch>=2.1.1,<3.0 transformers>=4.39.0,<5.0.0 asyncio>=3.4.3,<4.0 -langchain-community==0.0.28 -langchain-experimental==0.0.55 -backoff==2.2.1 toml pypdf==4.3.1 ratelimit==2.2.1 loguru==0.7.2 pydantic==2.8.2 -tenacity==8.5.0 -Pillow==10.4.0 +tenacity rich psutil sentry-sdk python-dotenv -opencv-python-headless PyYAML docstring_parser==0.16 black>=23.1,<25.0 @@ -26,12 +21,8 @@ types-pytz>=2023.3,<2025.0 types-chardet>=5.0.4.6 mypy-protobuf>=3.0.0 pytest>=8.1.1 -termcolor>=2.4.0 pandas>=2.2.2 -fastapi>=0.110.1 networkx -swarms-memory -pre-commit aiofiles swarm-models clusterops diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index a4c04a16..d1f3a745 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -24,8 +24,6 @@ import toml import yaml from pydantic import BaseModel from swarm_models.tiktoken_wrapper import TikTokenizer -from termcolor import colored - from swarms.agents.ape_agent import auto_generate_prompt from swarms.prompts.agent_system_prompts import AGENT_SYSTEM_PROMPT_3 from swarms.prompts.multi_modal_autonomous_instruction_prompt import ( @@ -671,11 +669,8 @@ class Agent: return self.stopping_condition(response) return False except Exception as error: - print( - colored( - f"Error checking stopping condition: {error}", - "red", - ) + logger.error( + f"Error checking stopping condition: {error}" ) def dynamic_temperature(self): @@ -688,21 +683,20 @@ class Agent: try: if hasattr(self.llm, "temperature"): # Randomly change the temperature attribute of self.llm object - logger.info("Enabling Random Dyamic Temperature") self.llm.temperature = random.uniform(0.0, 1.0) else: # Use a default temperature self.llm.temperature = 0.5 except Exception as error: - print( - colored( - f"Error dynamically changing temperature: {error}" - ) + logger.error( + f"Error dynamically changing temperature: {error}" ) def print_dashboard(self): """Print dashboard""" - print(colored("Initializing Agent Dashboard...", "yellow")) + formatter.print_panel( + f"Initializing Agent: {self.agent_name}" + ) data = self.to_dict() @@ -710,22 +704,19 @@ class Agent: # data = json.dumps(data, indent=4) # json_data = json.dumps(data, indent=4) - print( - colored( - f""" - Agent Dashboard - -------------------------------------------- + formatter.print_panel( + f""" + Agent Dashboard + -------------------------------------------- - Agent {self.agent_name} is initializing for {self.max_loops} with the following configuration: - ---------------------------------------- + Agent {self.agent_name} is initializing for {self.max_loops} with the following configuration: + ---------------------------------------- - Agent Configuration: - Configuration: {data} + Agent Configuration: + Configuration: {data} - ---------------------------------------- - """, - "green", - ) + ---------------------------------------- + """, ) def loop_count_print( @@ -737,7 +728,7 @@ class Agent: loop_count (_type_): _description_ max_loops (_type_): _description_ """ - print(colored(f"\nLoop {loop_count} of {max_loops}", "cyan")) + logger.info(f"\nLoop {loop_count} of {max_loops}") print("\n") # Check parameters @@ -761,8 +752,8 @@ class Agent: self, task: Optional[str] = None, img: Optional[str] = None, - is_last: bool = False, - print_task: bool = False, + is_last: Optional[bool] = False, + print_task: Optional[bool] = False, *args, **kwargs, ) -> Any: @@ -960,7 +951,7 @@ class Agent: if self.interactive: logger.info("Interactive mode enabled.") - user_input = colored(input("You: "), "red") + user_input = formatter.print_panel(input("You: ")) # User-defined exit command if ( @@ -1060,7 +1051,7 @@ class Agent: except Exception as error: logger.info( - f"Error running agent: {error} optimize your input parameters" + f"Error running agent: {error} optimize your input parameter" ) raise error @@ -1261,7 +1252,7 @@ class Agent: logger.info(f"Running bulk tasks: {inputs}") return [self.run(**input_data) for input_data in inputs] except Exception as error: - print(colored(f"Error running bulk run: {error}", "red")) + logger.info(f"Error running bulk run: {error}", "red") def save(self) -> None: """Save the agent history to a file. @@ -1438,9 +1429,7 @@ class Agent: with open(file_path, "w") as f: yaml.dump(self.to_dict(), f) except Exception as error: - logger.error( - colored(f"Error saving agent to YAML: {error}", "red") - ) + logger.error(f"Error saving agent to YAML: {error}") raise error def get_llm_parameters(self): @@ -1505,7 +1494,7 @@ class Agent: role=self.user_name, content=data ) except Exception as error: - print(colored(f"Error ingesting docs: {error}", "red")) + logger.info(f"Error ingesting docs: {error}", "red") def ingest_pdf(self, pdf: str): """Ingest the pdf into the memory @@ -1520,7 +1509,7 @@ class Agent: role=self.user_name, content=text ) except Exception as error: - print(colored(f"Error ingesting pdf: {error}", "red")) + logger.info(f"Error ingesting pdf: {error}", "red") def receieve_message(self, name: str, message: str): """Receieve a message""" @@ -1604,12 +1593,10 @@ class Agent: role=self.user_name, content=text ) except Exception as error: - print( - colored( - f"Error getting docs from doc folders: {error}", - "red", - ) + logger.error( + f"Error getting docs from doc folders: {error}" ) + raise error def check_end_session_agentops(self): if self.agent_ops_on is True: @@ -1629,7 +1616,8 @@ class Agent: try: # Query the long term memory if self.long_term_memory is not None: - logger.info(f"Querying long term memory for: {task}") + formatter.print_panel(f"Querying RAG for: {task}") + memory_retrieval = self.long_term_memory.query( task, *args, **kwargs ) @@ -1638,15 +1626,15 @@ class Agent: f"Documents Available: {str(memory_retrieval)}" ) - # Count the tokens - memory_token_count = self.tokenizer.count_tokens( - memory_retrieval - ) - if memory_token_count > self.memory_chunk_size: - # Truncate the memory by the memory chunk size - memory_retrieval = self.truncate_string_by_tokens( - memory_retrieval, self.memory_chunk_size - ) + # # Count the tokens + # memory_token_count = self.tokenizer.count_tokens( + # memory_retrieval + # ) + # if memory_token_count > self.memory_chunk_size: + # # Truncate the memory by the memory chunk size + # memory_retrieval = self.truncate_string_by_tokens( + # memory_retrieval, self.memory_chunk_size + # ) self.short_memory.add( role="Database", diff --git a/swarms/structs/base_workflow.py b/swarms/structs/base_workflow.py index b75bfe2c..4107042a 100644 --- a/swarms/structs/base_workflow.py +++ b/swarms/structs/base_workflow.py @@ -1,8 +1,7 @@ import json from typing import Any, Dict, List, Optional -from termcolor import colored - +from swarms.utils.formatter import formatter from swarms.structs.agent import Agent from swarms.structs.base_structure import BaseStructure from swarms.structs.task import Task @@ -132,9 +131,10 @@ class BaseWorkflow(BaseStructure): for task in self.tasks: task.result = None except Exception as error: - print( - colored(f"Error resetting workflow: {error}", "red"), + formatter.print_panel( + f"Error resetting workflow: {error}" ) + raise error def get_task_results(self) -> Dict[str, Any]: """ @@ -148,10 +148,8 @@ class BaseWorkflow(BaseStructure): task.description: task.result for task in self.tasks } except Exception as error: - print( - colored( - f"Error getting task results: {error}", "red" - ), + formatter.print_panel( + f"Error getting task results: {error}" ) def remove_task(self, task: str) -> None: @@ -163,12 +161,10 @@ class BaseWorkflow(BaseStructure): if task.description != task ] except Exception as error: - print( - colored( - f"Error removing task from workflow: {error}", - "red", - ), + formatter.print_panel( + f"Error removing task from workflow: {error}", ) + raise error def update_task(self, task: str, **updates) -> None: """ @@ -203,11 +199,9 @@ class BaseWorkflow(BaseStructure): f"Task {task} not found in workflow." ) except Exception as error: - print( - colored( - f"Error updating task in workflow: {error}", "red" - ), - ) + formatter.print_panel( + f"Error updating task in workflow: {error}" + ), def delete_task(self, task: str) -> None: """ @@ -240,12 +234,10 @@ class BaseWorkflow(BaseStructure): f"Task {task} not found in workflow." ) except Exception as error: - print( - colored( - f"Error deleting task from workflow: {error}", - "red", - ), + formatter.print_panel( + f"Error deleting task from workflow: {error}", ) + raise error def save_workflow_state( self, @@ -287,23 +279,18 @@ class BaseWorkflow(BaseStructure): } json.dump(state, f, indent=4) except Exception as error: - print( - colored( - f"Error saving workflow state: {error}", - "red", - ) + formatter.print_panel( + f"Error saving workflow state: {error}", ) + raise error def add_objective_to_workflow(self, task: str, **kwargs) -> None: """Adds an objective to the workflow.""" try: - print( - colored( - """ - Adding Objective to Workflow...""", - "green", - attrs=["bold", "underline"], - ) + formatter.print_panel( + """ + Adding Objective to Workflow...""", + "green", ) task = Task( @@ -314,12 +301,10 @@ class BaseWorkflow(BaseStructure): ) self.tasks.append(task) except Exception as error: - print( - colored( - f"Error adding objective to workflow: {error}", - "red", - ) + formatter.print_panel( + f"Error adding objective to workflow: {error}", ) + raise error def load_workflow_state( self, filepath: str = None, **kwargs @@ -359,11 +344,8 @@ class BaseWorkflow(BaseStructure): ) self.tasks.append(task) except Exception as error: - print( - colored( - f"Error loading workflow state: {error}", - "red", - ) + formatter.print_panel( + f"Error loading workflow state: {error}", ) def workflow_dashboard(self, **kwargs) -> None: @@ -383,25 +365,21 @@ class BaseWorkflow(BaseStructure): >>> workflow.workflow_dashboard() """ - print( - colored( - f""" - Sequential Workflow Dashboard - -------------------------------- - Name: {self.name} - Description: {self.description} - task_pool: {len(self.task_pool)} - Max Loops: {self.max_loops} - Autosave: {self.autosave} - Autosave Filepath: {self.saved_state_filepath} - Restore Filepath: {self.restore_state_filepath} - -------------------------------- - Metadata: - kwargs: {kwargs} - """, - "cyan", - attrs=["bold", "underline"], - ) + formatter.print_panel( + f""" + Sequential Workflow Dashboard + -------------------------------- + Name: {self.name} + Description: {self.description} + task_pool: {len(self.task_pool)} + Max Loops: {self.max_loops} + Autosave: {self.autosave} + Autosave Filepath: {self.saved_state_filepath} + Restore Filepath: {self.restore_state_filepath} + -------------------------------- + Metadata: + kwargs: {kwargs} + """ ) def workflow_bootup(self, **kwargs) -> None: @@ -409,11 +387,6 @@ class BaseWorkflow(BaseStructure): Workflow bootup. """ - print( - colored( - """ - Sequential Workflow Initializing...""", - "green", - attrs=["bold", "underline"], - ) + formatter.print_panel( + """Sequential Workflow Initializing...""", ) diff --git a/swarms/structs/conversation.py b/swarms/structs/conversation.py index f808382d..a86a6d3b 100644 --- a/swarms/structs/conversation.py +++ b/swarms/structs/conversation.py @@ -3,10 +3,9 @@ import json from typing import Any, Optional import yaml -from termcolor import colored - from swarms.structs.base_structure import BaseStructure from typing import TYPE_CHECKING +from swarms.utils.formatter import formatter if TYPE_CHECKING: from swarms.structs.agent import ( @@ -191,18 +190,9 @@ class Conversation(BaseStructure): Args: detailed (bool, optional): detailed. Defaults to False. """ - role_to_color = { - "system": "red", - "user": "green", - "assistant": "blue", - "function": "magenta", - } for message in self.conversation_history: - print( - colored( - f"{message['role']}: {message['content']}\n\n", - role_to_color[message["role"]], - ) + formatter.print_panel( + f"{message['role']}: {message['content']}\n\n" ) def export_conversation(self, filename: str, *args, **kwargs): @@ -307,46 +297,36 @@ class Conversation(BaseStructure): for message in messages: if message["role"] == "system": - print( - colored( - f"system: {message['content']}\n", - role_to_color[message["role"]], - ) + formatter.print_panel( + f"system: {message['content']}\n", + role_to_color[message["role"]], ) elif message["role"] == "user": - print( - colored( - f"user: {message['content']}\n", - role_to_color[message["role"]], - ) + formatter.print_panel( + f"user: {message['content']}\n", + role_to_color[message["role"]], ) elif message["role"] == "assistant" and message.get( "function_call" ): - print( - colored( - f"assistant: {message['function_call']}\n", - role_to_color[message["role"]], - ) + formatter.print_panel( + f"assistant: {message['function_call']}\n", + role_to_color[message["role"]], ) elif message["role"] == "assistant" and not message.get( "function_call" ): - print( - colored( - f"assistant: {message['content']}\n", - role_to_color[message["role"]], - ) + formatter.print_panel( + f"assistant: {message['content']}\n", + role_to_color[message["role"]], ) elif message["role"] == "tool": - print( - colored( - ( - f"function ({message['name']}):" - f" {message['content']}\n" - ), - role_to_color[message["role"]], - ) + formatter.print_panel( + ( + f"function ({message['name']}):" + f" {message['content']}\n" + ), + role_to_color[message["role"]], ) def truncate_memory_with_tokenizer(self): diff --git a/swarms/structs/mixture_of_agents.py b/swarms/structs/mixture_of_agents.py index f80701ef..e91d565f 100644 --- a/swarms/structs/mixture_of_agents.py +++ b/swarms/structs/mixture_of_agents.py @@ -86,9 +86,7 @@ class MixtureOfAgents: self.input_schema = MixtureOfAgentsInput( name=name, description=description, - agents=[ - agent.to_dict() for agent in self.agents - ], + agents=[agent.to_dict() for agent in self.agents], aggregator_agent=aggregator_agent.to_dict(), aggregator_system_prompt=self.aggregator_system_prompt, layers=self.layers, diff --git a/swarms/structs/multi_agent_exec.py b/swarms/structs/multi_agent_exec.py index d733f49f..b66af8a5 100644 --- a/swarms/structs/multi_agent_exec.py +++ b/swarms/structs/multi_agent_exec.py @@ -414,7 +414,7 @@ def run_agents_with_tasks_concurrently( List[Any]: A list of outputs from each agent execution. """ # Make the first agent not use the ifrs - + if no_clusterops: return _run_agents_with_tasks_concurrently( agents, tasks, batch_size, max_workers diff --git a/swarms/structs/rearrange.py b/swarms/structs/rearrange.py index 7c71bd04..a61efbee 100644 --- a/swarms/structs/rearrange.py +++ b/swarms/structs/rearrange.py @@ -121,16 +121,14 @@ class AgentRearrange(BaseSwarm): output_type: OutputType = "final", docs: List[str] = None, doc_folder: str = None, + device: str = "cpu", + device_id: int = 0, + all_cores: bool = False, + all_gpus: bool = True, + no_use_clusterops: bool = True, *args, **kwargs, ): - # reliability_check( - # agents=agents, - # name=name, - # description=description, - # flow=flow, - # max_loops=max_loops, - # ) super(AgentRearrange, self).__init__( name=name, description=description, @@ -150,6 +148,11 @@ class AgentRearrange(BaseSwarm): self.output_type = output_type self.docs = docs self.doc_folder = doc_folder + self.device = device + self.device_id = device_id + self.all_cores = all_cores + self.all_gpus = all_gpus + self.no_use_clusterops = no_use_clusterops self.swarm_history = { agent.agent_name: [] for agent in agents } @@ -506,7 +509,11 @@ class AgentRearrange(BaseSwarm): Returns: The result from executing the task through the cluster operations wrapper. """ - if no_use_clusterops: + no_use_clusterops = ( + no_use_clusterops or self.no_use_clusterops + ) + + if no_use_clusterops is True: return self._run( task=task, img=img, diff --git a/swarms/structs/sequential_workflow.py b/swarms/structs/sequential_workflow.py index cebcd7f0..ed55102d 100644 --- a/swarms/structs/sequential_workflow.py +++ b/swarms/structs/sequential_workflow.py @@ -107,7 +107,7 @@ class SequentialWorkflow: all_cores: bool = False, all_gpus: bool = False, device_id: int = 0, - no_use_clusterops: bool = False, + no_use_clusterops: bool = True, *args, **kwargs, ) -> str: diff --git a/swarms/tools/json_former.py b/swarms/tools/json_former.py index 01d608a5..dcca9932 100644 --- a/swarms/tools/json_former.py +++ b/swarms/tools/json_former.py @@ -1,7 +1,6 @@ import json from typing import Any, Dict, List, Union -from termcolor import cprint from transformers import PreTrainedModel, PreTrainedTokenizer from pydantic import BaseModel from swarms.tools.logits_processor import ( @@ -68,15 +67,6 @@ class Jsonformer: self.temperature = temperature self.max_string_token_length = max_string_token_length - def debug(self, caller: str, value: str, is_prompt: bool = False): - if self.debug_on: - if is_prompt: - cprint(caller, "green", end=" ") - cprint(value, "yellow") - else: - cprint(caller, "green", end=" ") - cprint(value, "blue") - def generate_number( self, temperature: Union[float, None] = None, iterations=0 ): diff --git a/swarms/tools/tool_utils.py b/swarms/tools/tool_utils.py index 9076e2d1..972f98ec 100644 --- a/swarms/tools/tool_utils.py +++ b/swarms/tools/tool_utils.py @@ -3,8 +3,7 @@ from typing import Any, List import inspect from typing import Callable - -from termcolor import colored +from swarms.utils.formatter import formatter def scrape_tool_func_docs(fn: Callable) -> str: @@ -37,17 +36,16 @@ def scrape_tool_func_docs(fn: Callable) -> str: f" {inspect.getdoc(fn)}\nParameters:\n{parameters_str}" ) except Exception as error: - print( - colored( - ( - f"Error scraping tool function docs {error} try" - " optimizing your inputs with different" - " variables and attempt once more." - ), - "red", - ) + error_msg = ( + formatter.print_panel( + f"Error scraping tool function docs {error} try" + " optimizing your inputs with different" + " variables and attempt once more." + ), ) + raise error + def tool_find_by_name(tool_name: str, tools: List[Any]): """Find the tool by name""" diff --git a/swarms/utils/callable_name.py b/swarms/utils/callable_name.py new file mode 100644 index 00000000..9a0b037f --- /dev/null +++ b/swarms/utils/callable_name.py @@ -0,0 +1,203 @@ +from typing import Any +import inspect +from functools import partial +import logging + + +class NameResolver: + """Utility class for resolving names of various objects""" + + @staticmethod + def get_name(obj: Any, default: str = "unnamed_callable") -> str: + """ + Get the name of any object with multiple fallback strategies. + + Args: + obj: The object to get the name from + default: Default name if all strategies fail + + Returns: + str: The resolved name + """ + strategies = [ + # Try getting __name__ attribute + lambda x: getattr(x, "__name__", None), + # Try getting class name + lambda x: ( + x.__class__.__name__ + if hasattr(x, "__class__") + else None + ), + # Try getting function name if it's a partial + lambda x: ( + x.func.__name__ if isinstance(x, partial) else None + ), + # Try getting the name from the class's type + lambda x: type(x).__name__, + # Try getting qualname + lambda x: getattr(x, "__qualname__", None), + # Try getting the module and class name + lambda x: ( + f"{x.__module__}.{x.__class__.__name__}" + if hasattr(x, "__module__") + else None + ), + # For async functions + lambda x: ( + x.__name__ if inspect.iscoroutinefunction(x) else None + ), + # For classes with custom __str__ + lambda x: ( + str(x) + if hasattr(x, "__str__") + and x.__str__ != object.__str__ + else None + ), + # For wrapped functions + lambda x: ( + getattr(x, "__wrapped__", None).__name__ + if hasattr(x, "__wrapped__") + else None + ), + ] + + # Try each strategy + for strategy in strategies: + try: + name = strategy(obj) + if name and isinstance(name, str): + return name.replace(" ", "_").replace("-", "_") + except Exception: + continue + + # Return default if all strategies fail + return default + + @staticmethod + def get_callable_details(obj: Any) -> dict: + """ + Get detailed information about a callable object. + + Returns: + dict: Dictionary containing: + - name: The resolved name + - type: The type of callable + - signature: The signature if available + - module: The module name if available + - doc: The docstring if available + """ + details = { + "name": NameResolver.get_name(obj), + "type": "unknown", + "signature": None, + "module": getattr(obj, "__module__", "unknown"), + "doc": inspect.getdoc(obj) + or "No documentation available", + } + + # Determine the type + if inspect.isclass(obj): + details["type"] = "class" + elif inspect.iscoroutinefunction(obj): + details["type"] = "async_function" + elif inspect.isfunction(obj): + details["type"] = "function" + elif isinstance(obj, partial): + details["type"] = "partial" + elif callable(obj): + details["type"] = "callable" + + # Try to get signature + try: + details["signature"] = str(inspect.signature(obj)) + except (ValueError, TypeError): + details["signature"] = "Unknown signature" + + return details + + @classmethod + def get_safe_name(cls, obj: Any, max_retries: int = 3) -> str: + """ + Safely get a name with retries and validation. + + Args: + obj: Object to get name from + max_retries: Maximum number of retry attempts + + Returns: + str: A valid name string + """ + retries = 0 + last_error = None + + while retries < max_retries: + try: + name = cls.get_name(obj) + + # Validate and clean the name + if name: + # Remove invalid characters + clean_name = "".join( + c + for c in name + if c.isalnum() or c in ["_", "."] + ) + + # Ensure it starts with a letter or underscore + if ( + not clean_name[0].isalpha() + and clean_name[0] != "_" + ): + clean_name = f"_{clean_name}" + + return clean_name + + except Exception as e: + last_error = e + retries += 1 + + # If all retries failed, generate a unique fallback name + import uuid + + fallback = f"callable_{uuid.uuid4().hex[:8]}" + logging.warning( + f"Failed to get name after {max_retries} retries. Using fallback: {fallback}. " + f"Last error: {str(last_error)}" + ) + return fallback + + +# # Example usage +# if __name__ == "__main__": +# def test_resolver(): +# # Test cases +# class TestClass: +# def method(self): +# pass + +# async def async_func(): +# pass + +# test_cases = [ +# TestClass, # Class +# TestClass(), # Instance +# async_func, # Async function +# lambda x: x, # Lambda +# partial(print, end=""), # Partial +# TestClass.method, # Method +# print, # Built-in function +# str, # Built-in class +# ] + +# resolver = NameResolver() + +# print("\nName Resolution Results:") +# print("-" * 50) +# for obj in test_cases: +# details = resolver.get_callable_details(obj) +# safe_name = resolver.get_safe_name(obj) +# print(f"\nObject: {obj}") +# print(f"Safe Name: {safe_name}") +# print(f"Details: {details}") + +# test_resolver() diff --git a/swarms/utils/markdown_message.py b/swarms/utils/markdown_message.py index a85cb4a1..03a35092 100644 --- a/swarms/utils/markdown_message.py +++ b/swarms/utils/markdown_message.py @@ -1,4 +1,4 @@ -from termcolor import colored +from swarms.utils.formatter import formatter def display_markdown_message(message: str, color: str = "cyan"): @@ -12,13 +12,10 @@ def display_markdown_message(message: str, color: str = "cyan"): if line == "": print() elif line == "---": - print(colored("-" * 50, color)) + formatter.print_panel("-" * 50) else: - print(colored(line, color)) + formatter.print_panel(line) if "\n" not in message and message.startswith(">"): # Aesthetic choice. For these tags, they need a space below them print() - - -# display_markdown_message("I love you and you are beautiful.", "cyan")