[FEAT] XML support #841: Add XML output type, formatter, and utilities; tests included

pull/845/head
Pavan Kumar 2 months ago
parent 48e7fd8a79
commit 7a62ef4b5b

@ -562,7 +562,14 @@ class Agent:
if self.react_on is True:
self.system_prompt += REACT_SYS_PROMPT
if len(self.max_loops) > 1:
if isinstance(self.max_loops, int):
max_loops_len = 1
else:
try:
max_loops_len = len(self.max_loops)
except Exception:
max_loops_len = 1
if max_loops_len > 1:
self.system_prompt += generate_reasoning_prompt(
self.max_loops
)
@ -1044,113 +1051,136 @@ class Agent:
):
loop_count += 1
if len(self.max_loops) > 1:
self.short_memory.add(
role=self.agent_name,
content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}",
)
# Use two loops for demonstration
for _ in range(2):
if isinstance(self.max_loops, int):
max_loops_len = 1
else:
try:
max_loops_len = len(self.max_loops)
except Exception:
max_loops_len = 1
if max_loops_len > 1:
self.short_memory.add(
role=self.agent_name,
content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}",
)
# If it is the final loop, then add the final loop message
if loop_count == self.max_loops:
self.short_memory.add(
role=self.agent_name,
content=f"🎉 Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.",
# If it is the final loop, then add the final loop message
if loop_count == self.max_loops:
self.short_memory.add(
role=self.agent_name,
content=f"🎉 Final Internal Reasoning Loop: {loop_count}/{self.max_loops} Prepare your comprehensive response.",
)
# Dynamic temperature
if self.dynamic_temperature_enabled is True:
self.dynamic_temperature()
# Task prompt
task_prompt = (
self.short_memory.return_history_as_string()
)
# Dynamic temperature
if self.dynamic_temperature_enabled is True:
self.dynamic_temperature()
# Parameters
attempt = 0
success = False
while attempt < self.retry_attempts and not success:
try:
if (
self.long_term_memory is not None
and self.rag_every_loop is True
):
logger.info(
"Querying RAG database for context..."
)
self.memory_query(task_prompt)
# Task prompt
task_prompt = (
self.short_memory.return_history_as_string()
)
# Generate response using LLM
response_args = (
(task_prompt, *args)
if img is None
else (task_prompt, img, *args)
)
# Parameters
attempt = 0
success = False
while attempt < self.retry_attempts and not success:
try:
if (
self.long_term_memory is not None
and self.rag_every_loop is True
):
logger.info(
"Querying RAG database for context..."
# Call the LLM
response = self.call_llm(
*response_args, **kwargs
)
self.memory_query(task_prompt)
# Generate response using LLM
response_args = (
(task_prompt, *args)
if img is None
else (task_prompt, img, *args)
)
# Convert to a str if the response is not a str
response = self.parse_llm_output(response)
# Call the LLM
response = self.call_llm(
*response_args, **kwargs
)
self.short_memory.add(
role=self.agent_name, content=response
)
# Convert to a str if the response is not a str
response = self.parse_llm_output(response)
# Print
self.pretty_print(response, loop_count)
self.short_memory.add(
role=self.agent_name, content=response
)
# Output Cleaner
self.output_cleaner_op(response)
# Print
self.pretty_print(response, loop_count)
####### MCP TOOL HANDLING #######
if (
self.mcp_servers
and self.tools_list_dictionary is not None
):
self.mcp_tool_handling(response)
# Output Cleaner
self.output_cleaner_op(response)
####### MCP TOOL HANDLING #######
####### MCP TOOL HANDLING #######
if (
self.mcp_servers
and self.tools_list_dictionary is not None
):
self.mcp_tool_handling(response)
# Check and execute tools
if self.tools is not None:
out = self.parse_and_execute_tools(
response
)
####### MCP TOOL HANDLING #######
self.short_memory.add(
role="Tool Executor", content=out
)
# Check and execute tools
if self.tools is not None:
out = self.parse_and_execute_tools(
response
)
if self.no_print is False:
agent_print(
f"{self.agent_name} - Tool Executor",
out,
loop_count,
self.streaming_on,
)
self.short_memory.add(
role="Tool Executor", content=out
)
out = self.call_llm(task=out)
if self.no_print is False:
agent_print(
f"{self.agent_name} - Tool Executor",
out,
loop_count,
self.streaming_on,
self.short_memory.add(
role=self.agent_name, content=out
)
out = self.call_llm(task=out)
if self.no_print is False:
agent_print(
f"{self.agent_name} - Agent Analysis",
out,
loop_count,
self.streaming_on,
)
self.short_memory.add(
role=self.agent_name, content=out
)
self.sentiment_and_evaluator(response)
if self.no_print is False:
agent_print(
f"{self.agent_name} - Agent Analysis",
out,
loop_count,
self.streaming_on,
)
success = True # Mark as successful to exit the retry loop
self.sentiment_and_evaluator(response)
except Exception as e:
success = True # Mark as successful to exit the retry loop
log_agent_data(self.to_dict())
except Exception as e:
if self.autosave is True:
self.save()
logger.error(
f"Attempt {attempt+1}: Error generating"
f" response: {e}"
)
attempt += 1
if not success:
log_agent_data(self.to_dict())
@ -1158,59 +1188,46 @@ class Agent:
self.save()
logger.error(
f"Attempt {attempt+1}: Error generating"
f" response: {e}"
"Failed to generate a valid response after"
" retry attempts."
)
attempt += 1
if not success:
break # Exit the loop if all retry attempts fail
log_agent_data(self.to_dict())
if self.autosave is True:
self.save()
logger.error(
"Failed to generate a valid response after"
" retry attempts."
)
break # Exit the loop if all retry attempts fail
# Check stopping conditions
if (
self.stopping_condition is not None
and self._check_stopping_condition(response)
):
logger.info("Stopping condition met.")
break
elif (
self.stopping_func is not None
and self.stopping_func(response)
):
logger.info("Stopping function met.")
break
if self.interactive:
# logger.info("Interactive mode enabled.")
user_input = input("You: ")
# User-defined exit command
# Check stopping conditions
if (
user_input.lower()
== self.custom_exit_command.lower()
self.stopping_condition is not None
and self._check_stopping_condition(response)
):
print("Exiting as per user request.")
logger.info("Stopping condition met.")
break
elif (
self.stopping_func is not None
and self.stopping_func(response)
):
logger.info("Stopping function met.")
break
self.short_memory.add(
role=self.user_name, content=user_input
)
if self.interactive:
# logger.info("Interactive mode enabled.")
user_input = input("You: ")
if self.loop_interval:
logger.info(
f"Sleeping for {self.loop_interval} seconds"
)
time.sleep(self.loop_interval)
# User-defined exit command
if (
user_input.lower()
== self.custom_exit_command.lower()
):
print("Exiting as per user request.")
break
self.short_memory.add(
role=self.user_name, content=user_input
)
if self.loop_interval:
logger.info(
f"Sleeping for {self.loop_interval} seconds"
)
time.sleep(self.loop_interval)
if self.autosave is True:
log_agent_data(self.to_dict())
@ -2661,7 +2678,6 @@ class Agent:
"""
# o# Use the existing executor from self.executor or create a new one if needed
with ThreadPoolExecutor() as executor:
# Create futures for each agent conversation
futures = [
executor.submit(
self.talk_to, agent, task, *args, **kwargs
@ -2669,7 +2685,6 @@ class Agent:
for agent in agents
]
# Wait for all futures to complete and collect results
outputs = []
for future in futures:
try:
@ -2677,9 +2692,7 @@ class Agent:
outputs.append(result)
except Exception as e:
logger.error(f"Error in agent communication: {e}")
outputs.append(
None
) # or handle error case as needed
outputs.append(None)
return outputs
@ -2692,7 +2705,6 @@ class Agent:
def pretty_print(self, response: str, loop_count: int):
if self.no_print is False:
if self.streaming_on is True:
# self.stream_response(response)
formatter.print_panel_token_by_token(
f"{self.agent_name}: {response}",
title=f"Agent Name: {self.agent_name} [Max Loops: {loop_count}]",
@ -2700,7 +2712,6 @@ class Agent:
elif self.no_print is True:
pass
else:
# logger.info(f"Response: {response}")
formatter.print_panel(
f"{self.agent_name}: {response}",
f"Agent Name {self.agent_name} [Max Loops: {loop_count} ]",
@ -2719,25 +2730,19 @@ class Agent:
ValueError: If the response format is unexpected and can't be handled
"""
try:
# Handle dictionary responses
if isinstance(response, dict):
if "choices" in response:
return response["choices"][0]["message"][
"content"
]
return json.dumps(
response
) # Convert other dicts to string
return json.dumps(response)
# Handle string responses
elif isinstance(response, str):
return response
# Handle list responses (from check_llm_outputs)
elif isinstance(response, list):
return "\n".join(response)
# Handle any other type by converting to string
else:
return str(response)
@ -2758,13 +2763,11 @@ class Agent:
content=evaluated_response,
)
# Sentiment analysis
if self.sentiment_analyzer:
logger.info("Analyzing sentiment...")
self.sentiment_analysis_handler(response)
def output_cleaner_op(self, response: str):
# Apply the cleaner function to the response
if self.output_cleaner is not None:
logger.info("Applying output cleaner to response.")

@ -14,6 +14,7 @@ HistoryOutputType = Literal[
"json",
"all",
"yaml",
"xml", # Added XML as a valid output type
# "dict-final",
"dict-all-except-first",
"str-all-except-first",
@ -39,6 +40,9 @@ def history_output_formatter(
return conversation.get_str()
elif type == "yaml":
return yaml.safe_dump(conversation.to_dict(), sort_keys=False)
elif type == "xml":
from swarms.utils.xml_utils import to_xml_string
return to_xml_string(conversation.to_dict(), root_tag="conversation")
# elif type == "dict-final":
# return conversation.to_dict()
elif type == "dict-all-except-first":

@ -0,0 +1,40 @@
import xml.etree.ElementTree as ET
from typing import Any
def dict_to_xml(tag: str, d: dict) -> ET.Element:
"""Convert a dictionary to an XML Element."""
elem = ET.Element(tag)
for key, val in d.items():
child = ET.Element(str(key))
if isinstance(val, dict):
child.append(dict_to_xml(str(key), val)) # FIX: use append, not extend
elif isinstance(val, list):
for item in val:
if isinstance(item, dict):
child.append(dict_to_xml(str(key), item))
else:
item_elem = ET.Element("item")
item_elem.text = str(item)
child.append(item_elem)
else:
child.text = str(val)
elem.append(child)
return elem
def to_xml_string(data: Any, root_tag: str = "root") -> str:
"""Convert a dict or list to an XML string."""
if isinstance(data, dict):
elem = dict_to_xml(root_tag, data)
elif isinstance(data, list):
elem = ET.Element(root_tag)
for item in data:
if isinstance(item, dict):
elem.append(dict_to_xml("item", item))
else:
item_elem = ET.Element("item")
item_elem.text = str(item)
elem.append(item_elem)
else:
elem = ET.Element(root_tag)
elem.text = str(data)
return ET.tostring(elem, encoding="unicode")

@ -0,0 +1,45 @@
import pytest
from swarms.utils.xml_utils import dict_to_xml, to_xml_string
import xml.etree.ElementTree as ET
def test_dict_to_xml_simple():
d = {"foo": "bar", "baz": 1}
elem = dict_to_xml("root", d)
xml_str = ET.tostring(elem, encoding="unicode")
assert "<foo>bar</foo>" in xml_str
assert "<baz>1</baz>" in xml_str
def test_dict_to_xml_nested():
d = {"foo": {"bar": "baz"}}
elem = dict_to_xml("root", d)
xml_str = ET.tostring(elem, encoding="unicode")
assert "<foo>" in xml_str and "<bar>baz</bar>" in xml_str
def test_dict_to_xml_list():
d = {"items": [1, 2, 3]}
elem = dict_to_xml("root", d)
xml_str = ET.tostring(elem, encoding="unicode")
assert xml_str.count("<item>") == 3
assert "<item>1</item>" in xml_str
def test_to_xml_string_dict():
d = {"foo": "bar"}
xml = to_xml_string(d, root_tag="root")
assert xml.startswith("<root>") and "<foo>bar</foo>" in xml
def test_to_xml_string_list():
data = [{"a": 1}, {"b": 2}]
xml = to_xml_string(data, root_tag="root")
assert xml.startswith("<root>") and xml.count("<item>") == 2
def test_to_xml_string_scalar():
xml = to_xml_string("hello", root_tag="root")
assert xml == "<root>hello</root>"
def test_dict_to_xml_edge_cases():
d = {"empty": [], "none": None, "bool": True}
elem = dict_to_xml("root", d)
xml_str = ET.tostring(elem, encoding="unicode")
assert "<empty />" in xml_str or "<empty></empty>" in xml_str
assert "<none>None</none>" in xml_str
assert "<bool>True</bool>" in xml_str
Loading…
Cancel
Save