diff --git a/conversation_test.py b/conversation_test.py new file mode 100644 index 00000000..ec8a0534 --- /dev/null +++ b/conversation_test.py @@ -0,0 +1,22 @@ +from swarms.structs.conversation import Conversation + +# Create a conversation object +conversation = Conversation(backend="in-memory") + +# Add a message to the conversation +conversation.add( + role="user", content="Hello, how are you?", category="input" +) + +# Add a message to the conversation +conversation.add( + role="assistant", + content="I'm good, thank you!", + category="output", +) + +print( + conversation.export_and_count_categories( + tokenizer_model_name="claude-3-5-sonnet-20240620" + ) +) diff --git a/realtor_agent.py b/realtor_agent.py index 3ec208f7..a2c9700c 100644 --- a/realtor_agent.py +++ b/realtor_agent.py @@ -1,12 +1,13 @@ -from typing import List import http.client import json -from swarms import Agent +import os +from typing import List from dotenv import load_dotenv +from swarms import Agent + load_dotenv() -import os def get_realtor_data_from_one_source(location: str): @@ -28,7 +29,7 @@ def get_realtor_data_from_one_source(location: str): ) headers = { - "x-rapidapi-key": os.getenv("RAPIDAPI_KEY"), + "x-rapidapi-key": os.getenv("RAPID_API_KEY"), "x-rapidapi-host": "realtor-search.p.rapidapi.com", } @@ -47,15 +48,15 @@ def get_realtor_data_from_one_source(location: str): res = conn.getresponse() data = res.read() - return "chicken data" + # return "chicken data" - # # Parse and format the response - # try: - # json_data = json.loads(data.decode("utf-8")) - # # Return formatted string instead of raw JSON - # return json.dumps(json_data, indent=2) - # except json.JSONDecodeError: - # return "Error: Could not parse API response" + # Parse and format the response + try: + json_data = json.loads(data.decode("utf-8")) + # Return formatted string instead of raw JSON + return json.dumps(json_data, indent=2) + except json.JSONDecodeError: + return "Error: Could not parse API response" def get_realtor_data_from_multiple_sources( @@ -144,11 +145,11 @@ When you receive property data: Provide clear, objective analysis while maintaining professional standards and ethical considerations.""", model_name="claude-3-sonnet-20240229", max_loops=1, - tools=[get_realtor_data_from_one_source], print_on=True, + streaming_on=True, ) agent.run( - "What are the best properties in Menlo Park, CA for rent under 3,000$?" + f"Create a report on the best properties in Menlo Park, CA, showcase, the name, description, price, and link to the property: {get_realtor_data_from_one_source('Menlo Park, CA')}" ) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index dab44638..33c3fd91 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -801,10 +801,11 @@ class Agent: or exists(self.mcp_urls) or exists(self.mcp_config) ): - self.pretty_print( - f"✨ [SYSTEM] Successfully integrated {len(tools)} MCP tools into agent: {self.agent_name} | Status: ONLINE | Time: {time.strftime('%H:%M:%S')} ✨", - loop_count=0, - ) + if self.print_on is True: + self.pretty_print( + f"✨ [SYSTEM] Successfully integrated {len(tools)} MCP tools into agent: {self.agent_name} | Status: ONLINE | Time: {time.strftime('%H:%M:%S')} ✨", + loop_count=0, + ) return tools except AgentMCPConnectionError as e: @@ -1087,6 +1088,8 @@ class Agent: **kwargs, ) + # If streaming is enabled, then don't print the response + # Parse the response from the agent with the output type if exists(self.tools_list_dictionary): if isinstance(response, BaseModel): @@ -1107,6 +1110,8 @@ class Agent: f"Structured Output - Attempting Function Call Execution [{time.strftime('%H:%M:%S')}] \n\n {format_data_structure(response)} ", loop_count, ) + elif self.streaming_on is True: + pass else: self.pretty_print( response, loop_count @@ -1147,8 +1152,12 @@ class Agent: self.save() logger.error( - f"Attempt {attempt+1}: Error generating" - f" response: {e}" + f"Attempt {attempt+1}/{self.max_retries}: Error generating response in loop {loop_count} for agent '{self.agent_name}': {str(e)} | " + f"Error type: {type(e).__name__}, Error details: {e.__dict__ if hasattr(e, '__dict__') else 'No additional details'} | " + f"Current task: '{task}', Agent state: max_loops={self.max_loops}, " + f"model={getattr(self.llm, 'model_name', 'unknown')}, " + f"temperature={getattr(self.llm, 'temperature', 'unknown')}" + f"{f' | Traceback: {e.__traceback__}' if hasattr(e, '__traceback__') else ''}" ) attempt += 1 @@ -1170,13 +1179,19 @@ class Agent: self.stopping_condition is not None and self._check_stopping_condition(response) ): - logger.info("Stopping condition met.") + logger.info( + f"Agent '{self.agent_name}' stopping condition met. " + f"Loop: {loop_count}, Response length: {len(str(response)) if response else 0}" + ) break elif ( self.stopping_func is not None and self.stopping_func(response) ): - logger.info("Stopping function met.") + logger.info( + f"Agent '{self.agent_name}' stopping function condition met. " + f"Loop: {loop_count}, Response length: {len(str(response)) if response else 0}" + ) break if self.interactive: @@ -1223,14 +1238,27 @@ class Agent: self._handle_run_error(error) def __handle_run_error(self, error: any): + import traceback + log_agent_data(self.to_dict()) if self.autosave is True: self.save() - logger.info( - f"Error detected running your agent {self.agent_name} \n Error {error} \n Optimize your input parameters and or add an issue on the swarms github and contact our team on discord for support ;) " + # Get detailed error information + error_type = type(error).__name__ + error_message = str(error) + traceback_info = traceback.format_exc() + + logger.error( + f"Error detected running your agent {self.agent_name}\n" + f"Error Type: {error_type}\n" + f"Error Message: {error_message}\n" + f"Traceback:\n{traceback_info}\n" + f"Agent State: {self.to_dict()}\n" + f"Optimize your input parameters and or add an issue on the swarms github and contact our team on discord for support ;)" ) + raise error def _handle_run_error(self, error: any): @@ -2952,7 +2980,8 @@ class Agent: # Fallback: provide a default summary summary = "I successfully executed the MCP tool and retrieved the information above." - self.pretty_print(summary, loop_count=current_loop) + if self.print_on is True: + self.pretty_print(summary, loop_count=current_loop) # Add to the memory self.short_memory.add( @@ -3003,10 +3032,11 @@ class Agent: content=format_data_structure(output), ) - self.pretty_print( - "Tool Executed Successfully", - loop_count, - ) + if self.print_on is True: + self.pretty_print( + f"Tool Executed Successfully [{time.strftime('%H:%M:%S')}]", + loop_count, + ) # Now run the LLM again without tools - create a temporary LLM instance # instead of modifying the cached one @@ -3030,10 +3060,11 @@ class Agent: content=tool_response, ) - self.pretty_print( - tool_response, - loop_count, - ) + if self.print_on is True: + self.pretty_print( + tool_response, + loop_count, + ) def list_output_types(self): return OutputType diff --git a/swarms/structs/conversation.py b/swarms/structs/conversation.py index 91d06154..82493f38 100644 --- a/swarms/structs/conversation.py +++ b/swarms/structs/conversation.py @@ -221,27 +221,6 @@ class Conversation(BaseStructure): ): super().__init__() - # Support both 'provider' and 'backend' parameters for backwards compatibility - # 'backend' takes precedence if both are provided - self.backend = backend or provider - self.backend_instance = None - - # Validate backend - valid_backends = [ - "in-memory", - "mem0", - "supabase", - "redis", - "sqlite", - "duckdb", - "pulsar", - ] - if self.backend not in valid_backends: - raise ValueError( - f"Invalid backend: '{self.backend}'. " - f"Valid backends are: {', '.join(valid_backends)}" - ) - # Initialize all attributes first self.id = id self.name = name or id @@ -275,6 +254,27 @@ class Conversation(BaseStructure): self.provider = provider # Keep for backwards compatibility self.conversations_dir = conversations_dir + # Support both 'provider' and 'backend' parameters for backwards compatibility + # 'backend' takes precedence if both are provided + self.backend = backend or provider + self.backend_instance = None + + # Validate backend + valid_backends = [ + "in-memory", + "mem0", + "supabase", + "redis", + "sqlite", + "duckdb", + "pulsar", + ] + if self.backend not in valid_backends: + raise ValueError( + f"Invalid backend: '{self.backend}'. " + f"Valid backends are: {', '.join(valid_backends)}" + ) + # Initialize backend if using persistent storage if self.backend in [ "supabase", @@ -484,8 +484,7 @@ class Conversation(BaseStructure): self, role: str, content: Union[str, dict, list, Any], - *args, - **kwargs, + category: Optional[str] = None, ): """Add a message to the conversation history. @@ -505,6 +504,9 @@ class Conversation(BaseStructure): if self.message_id_on: message["message_id"] = str(uuid.uuid4()) + if category: + message["category"] = category + # Add message to conversation history self.conversation_history.append(message) @@ -520,6 +522,79 @@ class Conversation(BaseStructure): f"Failed to autosave conversation: {str(e)}" ) + def export_and_count_categories( + self, tokenizer_model_name: Optional[str] = "gpt-4.1-mini" + ) -> Dict[str, int]: + """Export all messages with category 'input' and 'output' and count their tokens. + + This method searches through the conversation history and: + 1. Extracts all messages marked with category 'input' or 'output' + 2. Concatenates the content of each category + 3. Counts tokens for each category using the specified tokenizer model + + Args: + tokenizer_model_name (str): Name of the model to use for tokenization + + Returns: + Dict[str, int]: A dictionary containing: + - input_tokens: Number of tokens in input messages + - output_tokens: Number of tokens in output messages + - total_tokens: Total tokens across both categories + """ + try: + # Extract input and output messages + input_messages = [] + output_messages = [] + + for message in self.conversation_history: + # Get message content and ensure it's a string + content = message.get("content", "") + if not isinstance(content, str): + content = str(content) + + # Sort messages by category + category = message.get("category", "") + if category == "input": + input_messages.append(content) + elif category == "output": + output_messages.append(content) + + # Join messages with spaces + all_input_text = " ".join(input_messages) + all_output_text = " ".join(output_messages) + + print(all_input_text) + print(all_output_text) + + # Count tokens only if there is text + input_tokens = ( + count_tokens(all_input_text, tokenizer_model_name) + if all_input_text.strip() + else 0 + ) + output_tokens = ( + count_tokens(all_output_text, tokenizer_model_name) + if all_output_text.strip() + else 0 + ) + total_tokens = input_tokens + output_tokens + + return { + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "total_tokens": total_tokens, + } + + except Exception as e: + logger.error( + f"Error in export_and_count_categories: {str(e)}" + ) + return { + "input_tokens": 0, + "output_tokens": 0, + "total_tokens": 0, + } + def add_mem0( self, role: str, @@ -546,8 +621,9 @@ class Conversation(BaseStructure): def add( self, role: str, - content: Union[str, dict, list], + content: Union[str, dict, list, Any], metadata: Optional[dict] = None, + category: Optional[str] = None, ): """Add a message to the conversation history.""" # If using a persistent backend, delegate to it @@ -562,7 +638,9 @@ class Conversation(BaseStructure): ) return self.add_in_memory(role, content) elif self.provider == "in-memory": - return self.add_in_memory(role, content) + return self.add_in_memory( + role=role, content=content, category=category + ) elif self.provider == "mem0": return self.add_mem0( role=role, content=content, metadata=metadata diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index 3452343b..77f3d189 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -466,14 +466,10 @@ class SwarmRouter: def update_system_prompt_for_agent_in_swarm(self): # Use list comprehension for faster iteration - [ - setattr( - agent, - "system_prompt", - agent.system_prompt + MULTI_AGENT_COLLAB_PROMPT_TWO, - ) - for agent in self.agents - ] + for agent in self.agents: + if agent.system_prompt is None: + agent.system_prompt = "" + agent.system_prompt += MULTI_AGENT_COLLAB_PROMPT_TWO def agent_config(self): agent_config = {}