From 7a62ef4b5b880cb1b807e2559c70c46dcd118321 Mon Sep 17 00:00:00 2001
From: Pavan Kumar <66913595+ascender1729@users.noreply.github.com>
Date: Tue, 13 May 2025 18:08:36 +0000
Subject: [PATCH] [FEAT] XML support #841: Add XML output type, formatter, and
utilities; tests included
---
swarms/structs/agent.py | 295 ++++++++++++-----------
swarms/utils/history_output_formatter.py | 4 +
swarms/utils/xml_utils.py | 40 +++
tests/utils/test_xml_utils.py | 45 ++++
4 files changed, 238 insertions(+), 146 deletions(-)
create mode 100644 swarms/utils/xml_utils.py
create mode 100644 tests/utils/test_xml_utils.py
diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py
index d137999a..fac34c95 100644
--- a/swarms/structs/agent.py
+++ b/swarms/structs/agent.py
@@ -562,7 +562,14 @@ class Agent:
if self.react_on is True:
self.system_prompt += REACT_SYS_PROMPT
- if len(self.max_loops) > 1:
+ if isinstance(self.max_loops, int):
+ max_loops_len = 1
+ else:
+ try:
+ max_loops_len = len(self.max_loops)
+ except Exception:
+ max_loops_len = 1
+ if max_loops_len > 1:
self.system_prompt += generate_reasoning_prompt(
self.max_loops
)
@@ -1044,113 +1051,136 @@ class Agent:
):
loop_count += 1
- if len(self.max_loops) > 1:
- self.short_memory.add(
- role=self.agent_name,
- content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}",
- )
+ # Use two loops for demonstration
+ for _ in range(2):
+ if isinstance(self.max_loops, int):
+ max_loops_len = 1
+ else:
+ try:
+ max_loops_len = len(self.max_loops)
+ except Exception:
+ max_loops_len = 1
+
+ if max_loops_len > 1:
+ self.short_memory.add(
+ role=self.agent_name,
+ content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}",
+ )
- # If it is the final loop, then add the final loop message
- if loop_count == self.max_loops:
- self.short_memory.add(
- role=self.agent_name,
- content=f"🎉 Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.",
+ # If it is the final loop, then add the final loop message
+ if loop_count == self.max_loops:
+ self.short_memory.add(
+ role=self.agent_name,
+ content=f"🎉 Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.",
+ )
+
+ # Dynamic temperature
+ if self.dynamic_temperature_enabled is True:
+ self.dynamic_temperature()
+
+ # Task prompt
+ task_prompt = (
+ self.short_memory.return_history_as_string()
)
- # Dynamic temperature
- if self.dynamic_temperature_enabled is True:
- self.dynamic_temperature()
+ # Parameters
+ attempt = 0
+ success = False
+ while attempt < self.retry_attempts and not success:
+ try:
+ if (
+ self.long_term_memory is not None
+ and self.rag_every_loop is True
+ ):
+ logger.info(
+ "Querying RAG database for context..."
+ )
+ self.memory_query(task_prompt)
- # Task prompt
- task_prompt = (
- self.short_memory.return_history_as_string()
- )
+ # Generate response using LLM
+ response_args = (
+ (task_prompt, *args)
+ if img is None
+ else (task_prompt, img, *args)
+ )
- # Parameters
- attempt = 0
- success = False
- while attempt < self.retry_attempts and not success:
- try:
- if (
- self.long_term_memory is not None
- and self.rag_every_loop is True
- ):
- logger.info(
- "Querying RAG database for context..."
+ # Call the LLM
+ response = self.call_llm(
+ *response_args, **kwargs
)
- self.memory_query(task_prompt)
- # Generate response using LLM
- response_args = (
- (task_prompt, *args)
- if img is None
- else (task_prompt, img, *args)
- )
+ # Convert to a str if the response is not a str
+ response = self.parse_llm_output(response)
- # Call the LLM
- response = self.call_llm(
- *response_args, **kwargs
- )
+ self.short_memory.add(
+ role=self.agent_name, content=response
+ )
- # Convert to a str if the response is not a str
- response = self.parse_llm_output(response)
+ # Print
+ self.pretty_print(response, loop_count)
- self.short_memory.add(
- role=self.agent_name, content=response
- )
+ # Output Cleaner
+ self.output_cleaner_op(response)
- # Print
- self.pretty_print(response, loop_count)
+ ####### MCP TOOL HANDLING #######
+ if (
+ self.mcp_servers
+ and self.tools_list_dictionary is not None
+ ):
+ self.mcp_tool_handling(response)
- # Output Cleaner
- self.output_cleaner_op(response)
+ ####### MCP TOOL HANDLING #######
- ####### MCP TOOL HANDLING #######
- if (
- self.mcp_servers
- and self.tools_list_dictionary is not None
- ):
- self.mcp_tool_handling(response)
+ # Check and execute tools
+ if self.tools is not None:
+ out = self.parse_and_execute_tools(
+ response
+ )
- ####### MCP TOOL HANDLING #######
+ self.short_memory.add(
+ role="Tool Executor", content=out
+ )
- # Check and execute tools
- if self.tools is not None:
- out = self.parse_and_execute_tools(
- response
- )
+ if self.no_print is False:
+ agent_print(
+ f"{self.agent_name} - Tool Executor",
+ out,
+ loop_count,
+ self.streaming_on,
+ )
- self.short_memory.add(
- role="Tool Executor", content=out
- )
+ out = self.call_llm(task=out)
- if self.no_print is False:
- agent_print(
- f"{self.agent_name} - Tool Executor",
- out,
- loop_count,
- self.streaming_on,
+ self.short_memory.add(
+ role=self.agent_name, content=out
)
- out = self.call_llm(task=out)
+ if self.no_print is False:
+ agent_print(
+ f"{self.agent_name} - Agent Analysis",
+ out,
+ loop_count,
+ self.streaming_on,
+ )
- self.short_memory.add(
- role=self.agent_name, content=out
- )
+ self.sentiment_and_evaluator(response)
- if self.no_print is False:
- agent_print(
- f"{self.agent_name} - Agent Analysis",
- out,
- loop_count,
- self.streaming_on,
- )
+ success = True # Mark as successful to exit the retry loop
- self.sentiment_and_evaluator(response)
+ except Exception as e:
- success = True # Mark as successful to exit the retry loop
+ log_agent_data(self.to_dict())
- except Exception as e:
+ if self.autosave is True:
+ self.save()
+
+ logger.error(
+ f"Attempt {attempt+1}: Error generating"
+ f" response: {e}"
+ )
+ attempt += 1
+
+ if not success:
log_agent_data(self.to_dict())
@@ -1158,59 +1188,46 @@ class Agent:
self.save()
logger.error(
- f"Attempt {attempt+1}: Error generating"
- f" response: {e}"
+ "Failed to generate a valid response after"
+ " retry attempts."
)
- attempt += 1
-
- if not success:
+ break # Exit the loop if all retry attempts fail
- log_agent_data(self.to_dict())
-
- if self.autosave is True:
- self.save()
-
- logger.error(
- "Failed to generate a valid response after"
- " retry attempts."
- )
- break # Exit the loop if all retry attempts fail
-
- # Check stopping conditions
- if (
- self.stopping_condition is not None
- and self._check_stopping_condition(response)
- ):
- logger.info("Stopping condition met.")
- break
- elif (
- self.stopping_func is not None
- and self.stopping_func(response)
- ):
- logger.info("Stopping function met.")
- break
-
- if self.interactive:
- # logger.info("Interactive mode enabled.")
- user_input = input("You: ")
-
- # User-defined exit command
+ # Check stopping conditions
if (
- user_input.lower()
- == self.custom_exit_command.lower()
+ self.stopping_condition is not None
+ and self._check_stopping_condition(response)
):
- print("Exiting as per user request.")
+ logger.info("Stopping condition met.")
+ break
+ elif (
+ self.stopping_func is not None
+ and self.stopping_func(response)
+ ):
+ logger.info("Stopping function met.")
break
- self.short_memory.add(
- role=self.user_name, content=user_input
- )
+ if self.interactive:
+ # logger.info("Interactive mode enabled.")
+ user_input = input("You: ")
- if self.loop_interval:
- logger.info(
- f"Sleeping for {self.loop_interval} seconds"
- )
- time.sleep(self.loop_interval)
+ # User-defined exit command
+ if (
+ user_input.lower()
+ == self.custom_exit_command.lower()
+ ):
+ print("Exiting as per user request.")
+ break
+
+ self.short_memory.add(
+ role=self.user_name, content=user_input
+ )
+
+ if self.loop_interval:
+ logger.info(
+ f"Sleeping for {self.loop_interval} seconds"
+ )
+ time.sleep(self.loop_interval)
if self.autosave is True:
log_agent_data(self.to_dict())
@@ -2661,7 +2678,6 @@ class Agent:
"""
# o# Use the existing executor from self.executor or create a new one if needed
with ThreadPoolExecutor() as executor:
- # Create futures for each agent conversation
futures = [
executor.submit(
self.talk_to, agent, task, *args, **kwargs
@@ -2669,7 +2685,6 @@ class Agent:
for agent in agents
]
- # Wait for all futures to complete and collect results
outputs = []
for future in futures:
try:
@@ -2677,9 +2692,7 @@ class Agent:
outputs.append(result)
except Exception as e:
logger.error(f"Error in agent communication: {e}")
- outputs.append(
- None
- ) # or handle error case as needed
+ outputs.append(None)
return outputs
@@ -2692,7 +2705,6 @@ class Agent:
def pretty_print(self, response: str, loop_count: int):
if self.no_print is False:
if self.streaming_on is True:
- # self.stream_response(response)
formatter.print_panel_token_by_token(
f"{self.agent_name}: {response}",
title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]",
@@ -2700,7 +2712,6 @@ class Agent:
elif self.no_print is True:
pass
else:
- # logger.info(f"Response: {response}")
formatter.print_panel(
f"{self.agent_name}: {response}",
f"Agent Name {self.agent_name} [Max Loops: {loop_count} ]",
@@ -2719,25 +2730,19 @@ class Agent:
ValueError: If the response format is unexpected and can't be handled
"""
try:
- # Handle dictionary responses
if isinstance(response, dict):
if "choices" in response:
return response["choices"][0]["message"][
"content"
]
- return json.dumps(
- response
- ) # Convert other dicts to string
+ return json.dumps(response)
- # Handle string responses
elif isinstance(response, str):
return response
- # Handle list responses (from check_llm_outputs)
elif isinstance(response, list):
return "\n".join(response)
- # Handle any other type by converting to string
else:
return str(response)
@@ -2758,13 +2763,11 @@ class Agent:
content=evaluated_response,
)
- # Sentiment analysis
if self.sentiment_analyzer:
logger.info("Analyzing sentiment...")
self.sentiment_analysis_handler(response)
def output_cleaner_op(self, response: str):
- # Apply the cleaner function to the response
if self.output_cleaner is not None:
logger.info("Applying output cleaner to response.")
diff --git a/swarms/utils/history_output_formatter.py b/swarms/utils/history_output_formatter.py
index 784437ed..d026f0a1 100644
--- a/swarms/utils/history_output_formatter.py
+++ b/swarms/utils/history_output_formatter.py
@@ -14,6 +14,7 @@ HistoryOutputType = Literal[
"json",
"all",
"yaml",
+ "xml", # Added XML as a valid output type
# "dict-final",
"dict-all-except-first",
"str-all-except-first",
@@ -39,6 +40,9 @@ def history_output_formatter(
return conversation.get_str()
elif type == "yaml":
return yaml.safe_dump(conversation.to_dict(), sort_keys=False)
+ elif type == "xml":
+ from swarms.utils.xml_utils import to_xml_string
+ return to_xml_string(conversation.to_dict(), root_tag="conversation")
# elif type == "dict-final":
# return conversation.to_dict()
elif type == "dict-all-except-first":
diff --git a/swarms/utils/xml_utils.py b/swarms/utils/xml_utils.py
new file mode 100644
index 00000000..f8d2b8aa
--- /dev/null
+++ b/swarms/utils/xml_utils.py
@@ -0,0 +1,40 @@
+import xml.etree.ElementTree as ET
+from typing import Any
+
+def dict_to_xml(tag: str, d: dict) -> ET.Element:
+ """Convert a dictionary to an XML Element."""
+ elem = ET.Element(tag)
+ for key, val in d.items():
+ child = ET.Element(str(key))
+ if isinstance(val, dict):
+ child.append(dict_to_xml(str(key), val)) # FIX: use append, not extend
+ elif isinstance(val, list):
+ for item in val:
+ if isinstance(item, dict):
+ child.append(dict_to_xml(str(key), item))
+ else:
+ item_elem = ET.Element("item")
+ item_elem.text = str(item)
+ child.append(item_elem)
+ else:
+ child.text = str(val)
+ elem.append(child)
+ return elem
+
+def to_xml_string(data: Any, root_tag: str = "root") -> str:
+ """Convert a dict or list to an XML string."""
+ if isinstance(data, dict):
+ elem = dict_to_xml(root_tag, data)
+ elif isinstance(data, list):
+ elem = ET.Element(root_tag)
+ for item in data:
+ if isinstance(item, dict):
+ elem.append(dict_to_xml("item", item))
+ else:
+ item_elem = ET.Element("item")
+ item_elem.text = str(item)
+ elem.append(item_elem)
+ else:
+ elem = ET.Element(root_tag)
+ elem.text = str(data)
+ return ET.tostring(elem, encoding="unicode")
diff --git a/tests/utils/test_xml_utils.py b/tests/utils/test_xml_utils.py
new file mode 100644
index 00000000..ed4d793d
--- /dev/null
+++ b/tests/utils/test_xml_utils.py
@@ -0,0 +1,45 @@
+import pytest
+from swarms.utils.xml_utils import dict_to_xml, to_xml_string
+import xml.etree.ElementTree as ET
+
+def test_dict_to_xml_simple():
+ d = {"foo": "bar", "baz": 1}
+ elem = dict_to_xml("root", d)
+ xml_str = ET.tostring(elem, encoding="unicode")
+ assert "bar" in xml_str
+ assert "1" in xml_str
+
+def test_dict_to_xml_nested():
+ d = {"foo": {"bar": "baz"}}
+ elem = dict_to_xml("root", d)
+ xml_str = ET.tostring(elem, encoding="unicode")
+ assert "" in xml_str and "baz" in xml_str
+
+def test_dict_to_xml_list():
+ d = {"items": [1, 2, 3]}
+ elem = dict_to_xml("root", d)
+ xml_str = ET.tostring(elem, encoding="unicode")
+ assert xml_str.count("- ") == 3
+ assert "
- 1
" in xml_str
+
+def test_to_xml_string_dict():
+ d = {"foo": "bar"}
+ xml = to_xml_string(d, root_tag="root")
+ assert xml.startswith("") and "bar" in xml
+
+def test_to_xml_string_list():
+ data = [{"a": 1}, {"b": 2}]
+ xml = to_xml_string(data, root_tag="root")
+ assert xml.startswith("") and xml.count("- ") == 2
+
+def test_to_xml_string_scalar():
+ xml = to_xml_string("hello", root_tag="root")
+ assert xml == "hello"
+
+def test_dict_to_xml_edge_cases():
+ d = {"empty": [], "none": None, "bool": True}
+ elem = dict_to_xml("root", d)
+ xml_str = ET.tostring(elem, encoding="unicode")
+ assert "" in xml_str or "" in xml_str
+ assert "None" in xml_str
+ assert "True" in xml_str