reasoning duo

pull/792/head
Kye Gomez 1 month ago
parent 95abbf0e84
commit fe6a5fde2c

@ -0,0 +1,12 @@
from swarms import SelfConsistencyAgent
agent = SelfConsistencyAgent(
max_loops=1,
model_name="gpt-4o-mini",
system_prompt="You are a helpful assistant that can answer questions and help with tasks.",
description="You are a helpful assistant that can answer questions and help with tasks.",
)
agent.run(
"Create a comprehensive proof for the The Birch and Swinnerton-Dyer Conjecture"
)

@ -0,0 +1,158 @@
# ReasoningDuo
The ReasoningDuo class implements a dual-agent reasoning system that combines a reasoning agent and a main agent to provide well-thought-out responses to complex tasks. This architecture enables more robust and reliable outputs by separating the reasoning process from the final response generation.
## Class Overview
### Constructor Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| model_name | str | "reasoning-agent-01" | Name identifier for the reasoning agent |
| description | str | "A highly intelligent..." | Description of the reasoning agent's capabilities |
| model_names | list[str] | ["gpt-4o-mini", "gpt-4o"] | Model names for reasoning and main agents |
| system_prompt | str | "You are a helpful..." | System prompt for the main agent |
### Methods
| Method | Parameters | Returns | Description |
|--------|------------|---------|-------------|
| run | task: str | str | Processes a single task through both agents |
| batched_run | tasks: List[str] | List[str] | Processes multiple tasks sequentially |
## Quick Start
```python
from swarms.agents.reasoning_duo import ReasoningDuo
# Initialize the ReasoningDuo
duo = ReasoningDuo(
model_name="reasoning-agent-01",
model_names=["gpt-4o-mini", "gpt-4o"]
)
# Run a single task
result = duo.run("Explain the concept of gravitational waves")
# Run multiple tasks
tasks = [
"Calculate compound interest for $1000 over 5 years",
"Explain quantum entanglement"
]
results = duo.batched_run(tasks)
```
## Examples
### 1. Mathematical Analysis
```python
duo = ReasoningDuo()
# Complex mathematical problem
math_task = """
Solve the following differential equation:
dy/dx + 2y = x^2, y(0) = 1
"""
solution = duo.run(math_task)
```
### 2. Physics Problem
```python
# Quantum mechanics problem
physics_task = """
Calculate the wavelength of an electron with kinetic energy of 50 eV
using the de Broglie relationship.
"""
result = duo.run(physics_task)
```
### 3. Financial Analysis
```python
# Complex financial analysis
finance_task = """
Calculate the Net Present Value (NPV) of a project with:
- Initial investment: $100,000
- Annual cash flows: $25,000 for 5 years
- Discount rate: 8%
"""
analysis = duo.run(finance_task)
```
## Advanced Usage
### Customizing Agent Behavior
You can customize both agents by modifying their initialization parameters:
```python
duo = ReasoningDuo(
model_name="custom-reasoning-agent",
description="Specialized financial analysis agent",
model_names=["gpt-4o-mini", "gpt-4o"],
system_prompt="You are a financial expert AI assistant..."
)
```
### Batch Processing with Progress Tracking
```python
tasks = [
"Analyze market trends for tech stocks",
"Calculate risk metrics for a portfolio",
"Forecast revenue growth"
]
# Process multiple tasks with logging
results = duo.batched_run(tasks)
```
## Implementation Details
The ReasoningDuo uses a two-stage process:
1. **Reasoning Stage**: The reasoning agent analyzes the task and develops a structured approach
2. **Execution Stage**: The main agent uses the reasoning output to generate the final response
### Internal Architecture
```
Task Input → Reasoning Agent → Structured Analysis → Main Agent → Final Output
```
## Best Practices
1. **Task Formulation**
- Be specific and clear in task descriptions
- Include relevant context and constraints
- Break complex problems into smaller subtasks
2. **Performance Optimization**
- Use batched_run for multiple related tasks
- Monitor agent outputs for consistency
- Adjust model parameters based on task complexity
## Error Handling
The ReasoningDuo includes built-in logging using the `loguru` library:
```python
from loguru import logger
# Logs are automatically generated for each task
logger.info("Task processing started")
```
## Limitations
- Processing time may vary based on task complexity
- Model response quality depends on input clarity
- Resource usage scales with batch size

@ -1,7 +1,7 @@
from swarms.agents.i_agent import IterativeReflectiveExpansion from swarms.agents.i_agent import IterativeReflectiveExpansion
agent = IterativeReflectiveExpansion( agent = IterativeReflectiveExpansion(
max_iterations=3, max_iterations=1,
) )
agent.run("What is the 40th prime number?") agent.run("What is the 40th prime number?")

@ -1,3 +1,4 @@
import json
from swarms.structs.malt import MALT from swarms.structs.malt import MALT
malt = MALT( malt = MALT(
@ -6,7 +7,12 @@ malt = MALT(
) )
malt.run( malt.run(
task="Prove that the sum of the first n natural numbers is n(n+1)/2." task="Please solve the following challenging mathematical problem: Prove that for any positive integer n, the sum of the first n positive odd integers equals n². Include a rigorous proof with clear steps and explanations."
) )
print(malt.conversation.return_json()) with open("conversation_output.json", "w") as json_file:
json.dump(
malt.conversation.return_messages_as_dictionary(),
json_file,
indent=4,
)

@ -12,6 +12,7 @@ BASE_URL = "https://swarms-api-285321057562.us-east1.run.app"
# Headers for secure API communication # Headers for secure API communication
headers = {"x-api-key": API_KEY, "Content-Type": "application/json"} headers = {"x-api-key": API_KEY, "Content-Type": "application/json"}
def create_financial_swarm(equity_data: str): def create_financial_swarm(equity_data: str):
""" """
Constructs and triggers a full-stack financial swarm consisting of three agents: Constructs and triggers a full-stack financial swarm consisting of three agents:
@ -37,7 +38,7 @@ def create_financial_swarm(equity_data: str):
"max_loops": 1, "max_loops": 1,
"max_tokens": 4000, "max_tokens": 4000,
"temperature": 0.3, "temperature": 0.3,
"auto_generate_prompt": False "auto_generate_prompt": False,
}, },
{ {
"agent_name": "Risk Assessor", "agent_name": "Risk Assessor",
@ -53,7 +54,7 @@ def create_financial_swarm(equity_data: str):
"max_loops": 1, "max_loops": 1,
"max_tokens": 3000, "max_tokens": 3000,
"temperature": 0.2, "temperature": 0.2,
"auto_generate_prompt": False "auto_generate_prompt": False,
}, },
{ {
"agent_name": "Market Advisor", "agent_name": "Market Advisor",
@ -70,8 +71,8 @@ def create_financial_swarm(equity_data: str):
"max_loops": 1, "max_loops": 1,
"max_tokens": 5000, "max_tokens": 5000,
"temperature": 0.3, "temperature": 0.3,
"auto_generate_prompt": False "auto_generate_prompt": False,
} },
], ],
"max_loops": 1, "max_loops": 1,
"swarm_type": "SequentialWorkflow", "swarm_type": "SequentialWorkflow",

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "7.5.2" version = "7.5.3"
description = "Swarms - TGSC" description = "Swarms - TGSC"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]

@ -18,6 +18,7 @@ from swarms.agents.create_agents_from_yaml import (
from swarms.agents.i_agent import IterativeReflectiveExpansion from swarms.agents.i_agent import IterativeReflectiveExpansion
from swarms.agents.consistency_agent import SelfConsistencyAgent from swarms.agents.consistency_agent import SelfConsistencyAgent
from swarms.agents.reasoning_duo import ReasoningDuo
__all__ = [ __all__ = [
# "ToolAgent", # "ToolAgent",
@ -34,4 +35,5 @@ __all__ = [
"create_agents_from_yaml", "create_agents_from_yaml",
"IterativeReflectiveExpansion", "IterativeReflectiveExpansion",
"SelfConsistencyAgent", "SelfConsistencyAgent",
"ReasoningDuo",
] ]

@ -48,6 +48,7 @@ class SelfConsistencyAgent(Agent):
return_dict: bool = False, return_dict: bool = False,
return_json: bool = False, return_json: bool = False,
majority_voting_prompt: str = None, majority_voting_prompt: str = None,
eval: bool = False,
**kwargs, **kwargs,
): ):
""" """
@ -70,6 +71,7 @@ class SelfConsistencyAgent(Agent):
self.return_dict = return_dict self.return_dict = return_dict
self.return_json = return_json self.return_json = return_json
self.majority_voting_prompt = majority_voting_prompt self.majority_voting_prompt = majority_voting_prompt
self.eval = eval
def run( def run(
self, task: str, answer: str = None, *args, **kwargs self, task: str, answer: str = None, *args, **kwargs
@ -101,16 +103,17 @@ class SelfConsistencyAgent(Agent):
self.conversation.add(role=self.agent_name, content=responses) self.conversation.add(role=self.agent_name, content=responses)
if answer is not None: if self.eval:
correct = self.check_responses_for_answer( if answer is not None:
responses, answer correct = self.check_responses_for_answer(
) responses, answer
if not correct:
logger.info(
"The answer is not correct. Please try again."
) )
return None
if not correct:
logger.info(
"The answer is not correct. Please try again."
)
return None
# Aggregation agent # Aggregation agent
# final_answer = self.aggregation_agent(responses) # final_answer = self.aggregation_agent(responses)
@ -178,6 +181,18 @@ class SelfConsistencyAgent(Agent):
) )
return False return False
def batched_run(
self, tasks: List[str], *args, **kwargs
) -> List[str]:
"""
Runs the agent in a batched manner.
"""
responses = []
for task in tasks:
response = self.run(task, *args, **kwargs)
responses.append(response)
return responses
# # Example usage: # # Example usage:
# if __name__ == "__main__": # if __name__ == "__main__":

@ -1,9 +1,9 @@
from concurrent.futures import ThreadPoolExecutor
import json import json
import os import os
import subprocess import subprocess
import sys import sys
import time import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, List, Optional from typing import Any, Callable, Dict, List, Optional
from loguru import logger from loguru import logger

@ -0,0 +1,84 @@
from typing import List
from loguru import logger
from swarms.prompts.reasoning_prompt import REASONING_PROMPT
from swarms.structs.agent import Agent
class ReasoningDuo:
"""
ReasoningDuo is a class that encapsulates the functionality of two agents: a reasoning agent and a main agent.
Attributes:
model_name (str): The name of the model used for the reasoning agent.
description (str): A description of the reasoning agent.
model_names (list[str]): A list of model names for the agents.
system_prompt (str): The system prompt for the main agent.
reasoning_agent (Agent): An instance of the Agent class for reasoning tasks.
main_agent (Agent): An instance of the Agent class for main tasks.
"""
def __init__(
self,
model_name: str = "reasoning-agent-01",
description: str = "A highly intelligent and thoughtful AI designed to provide accurate and well-reasoned answers to the user's questions.",
model_names: list[str] = ["gpt-4o-mini", "gpt-4o"],
system_prompt: str = "You are a helpful assistant that can answer questions and help with tasks.",
):
self.model_name = model_name
self.description = description
self.reasoning_agent = Agent(
agent_name="Your",
description="A highly intelligent and thoughtful AI designed to provide accurate and well-reasoned answers to the user's questions.",
system_prompt=REASONING_PROMPT,
max_loops=1,
model_name=model_names[0],
dynamic_temperature_enabled=True,
)
self.main_agent = Agent(
agent_name="Main Agent",
description="A highly intelligent and thoughtful AI designed to provide accurate and well-reasoned answers to the user's questions.",
system_prompt=system_prompt,
max_loops=1,
model_name=model_names[1],
dynamic_temperature_enabled=True,
)
def run(self, task: str):
"""
Executes the reasoning and main agents on the provided task.
Args:
task (str): The task to be processed by the agents.
Returns:
str: The output from the main agent after processing the task.
"""
logger.info(f"Running task: {task}")
output_reasoner = self.reasoning_agent.run(task)
output_main = self.main_agent.run(
f"Task: {task} \n\n Your thoughts: {output_reasoner}"
)
logger.info(f"Output from main agent: {output_main}")
return output_main
def batched_run(self, tasks: List[str]):
"""
Executes the run method for a list of tasks.
Args:
tasks (list[str]): A list of tasks to be processed.
Returns:
list: A list of outputs from the main agent for each task.
"""
outputs = []
for task in tasks:
logger.info(f"Processing task: {task}")
outputs.append(self.run(task))
return outputs

@ -0,0 +1,9 @@
REASONING_PROMPT = """
This is a structured conversation between the User and the Assistant, where the User poses a question, and the Assistant is tasked with providing a comprehensive solution.
Before delivering the final answer, the Assistant must engage in a thorough reasoning process. This involves critically analyzing the question, considering various perspectives, and evaluating potential solutions. The Assistant should articulate this reasoning process clearly, allowing the User to understand the thought process behind the answer.
The reasoning process and the final answer should be distinctly enclosed within <think> </think> tags. For example, the format should be: <think> reasoning process here </think> for the reasoning, followed by <think> final answer here </think> for the answer.
It is essential to output multiple <think> </think> tags to reflect the depth of thought and exploration involved in addressing the task. The Assistant should strive to think deeply and thoroughly about the question, ensuring that all relevant aspects are considered before arriving at a conclusion.
"""

@ -579,23 +579,31 @@ class Agent:
# Telemetry Processor to log agent data # Telemetry Processor to log agent data
log_agent_data(self.to_dict()) log_agent_data(self.to_dict())
if self.llm is None and self.model_name is not None: if self.llm is None:
self.llm = self.llm_handling() self.llm = self.llm_handling()
def llm_handling(self): def llm_handling(self):
from swarms.utils.litellm_wrapper import LiteLLM from swarms.utils.litellm_wrapper import LiteLLM
if self.llm_args is not None: if self.model_name is None:
llm = LiteLLM(model_name=self.model_name, **self.llm_args) raise ValueError("Model name cannot be None")
else:
llm = LiteLLM(
model_name=self.model_name,
temperature=self.temperature,
max_tokens=self.max_tokens,
)
return llm try:
if self.llm_args is not None:
llm = LiteLLM(
model_name=self.model_name, **self.llm_args
)
else:
llm = LiteLLM(
model_name=self.model_name,
temperature=self.temperature,
max_tokens=self.max_tokens,
system_prompt=self.system_prompt,
)
return llm
except Exception as e:
logger.error(f"Error in llm_handling: {e}")
return None
def prepare_tools_list_dictionary(self): def prepare_tools_list_dictionary(self):
import json import json
@ -1064,13 +1072,13 @@ class Agent:
self.short_memory.get_str() self.short_memory.get_str()
) )
# Handle artifacts # # Handle artifacts
if self.artifacts_on is True: # if self.artifacts_on is True:
self.handle_artifacts( # self.handle_artifacts(
concat_strings(all_responses), # concat_strings(all_responses),
self.artifacts_output_path, # self.artifacts_output_path,
self.artifacts_file_extension, # self.artifacts_file_extension,
) # )
log_agent_data(self.to_dict()) log_agent_data(self.to_dict())
if self.autosave is True: if self.autosave is True:
@ -2413,8 +2421,8 @@ class Agent:
if not isinstance(task, str): if not isinstance(task, str):
raise TypeError("Task must be a string") raise TypeError("Task must be a string")
if not task.strip(): if task is None:
raise ValueError("Task cannot be empty") raise ValueError("Task cannot be None")
# if self.llm is None: # if self.llm is None:
# raise TypeError("LLM object cannot be None") # raise TypeError("LLM object cannot be None")

@ -5,8 +5,9 @@ from typing import Any, Optional, Union
import yaml import yaml
from swarms.structs.base_structure import BaseStructure from swarms.structs.base_structure import BaseStructure
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from swarms.utils.any_to_str import any_to_str
from swarms.utils.formatter import formatter from swarms.utils.formatter import formatter
from swarms.utils.litellm_tokenizer import count_tokens
if TYPE_CHECKING: if TYPE_CHECKING:
from swarms.structs.agent import ( from swarms.structs.agent import (
@ -79,6 +80,7 @@ class Conversation(BaseStructure):
auto_save: bool = True, auto_save: bool = True,
save_as_yaml: bool = True, save_as_yaml: bool = True,
save_as_json_bool: bool = False, save_as_json_bool: bool = False,
token_count: bool = True,
*args, *args,
**kwargs, **kwargs,
): ):
@ -96,6 +98,7 @@ class Conversation(BaseStructure):
self.auto_save = auto_save self.auto_save = auto_save
self.save_as_yaml = save_as_yaml self.save_as_yaml = save_as_yaml
self.save_as_json_bool = save_as_json_bool self.save_as_json_bool = save_as_json_bool
self.token_count = token_count
# If system prompt is not None, add it to the conversation history # If system prompt is not None, add it to the conversation history
if self.system_prompt is not None: if self.system_prompt is not None:
@ -128,17 +131,21 @@ class Conversation(BaseStructure):
now = datetime.datetime.now() now = datetime.datetime.now()
timestamp = now.strftime("%Y-%m-%d %H:%M:%S") timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
tokens = count_tokens(any_to_str(content))
if isinstance(content, dict) or isinstance(content, list): if isinstance(content, dict) or isinstance(content, list):
message = { message = {
"role": role, "role": role,
"content": content, "content": content,
"token_count": int(tokens),
} }
else: else:
message = { message = {
"role": role, "role": role,
"content": f"Time: {timestamp} \n{content}", "content": f"Time: {timestamp} \n {content}",
"token_count": int(tokens),
} }
self.conversation_history.append(message) self.conversation_history.append(message)
@ -448,16 +455,17 @@ class Conversation(BaseStructure):
# # Example usage # # Example usage
# # conversation = Conversation() # # conversation = Conversation()
# conversation = Conversation() # conversation = Conversation(token_count=True)
# conversation.add("user", "Hello, how are you?") # conversation.add("user", "Hello, how are you?")
# conversation.add( # conversation.add("assistant", "I am doing well, thanks.")
# "assistant", {"name": "tool_1", "output": "Hello, how are you?"} # # conversation.add(
# ) # # "assistant", {"name": "tool_1", "output": "Hello, how are you?"}
# print(conversation.return_json()) # # )
# # print(conversation.return_json())
# # print(conversation.get_last_message_as_string()) # # # print(conversation.get_last_message_as_string())
# # print(conversation.return_messages_as_list()) # print(conversation.return_json())
# # conversation.add("assistant", "I am doing well, thanks.") # # # conversation.add("assistant", "I am doing well, thanks.")
# # # print(conversation.to_json()) # # # # print(conversation.to_json())
# # print(type(conversation.to_dict())) # # print(type(conversation.to_dict()))
# # print(conversation.to_yaml()) # # print(conversation.to_yaml())

@ -13,7 +13,7 @@ Potential Improvements:
- Autonomously create the agents based on the task. - Autonomously create the agents based on the task.
- Feed verifier responses back into the creator to improve the proof. - Feed verifier responses back into the creator to improve the proof.
- Feed refiner responses back into the creator to improve the proof. - Feed refiner responses back into the creator to improve the proof.
- - Feed majority voting responses back into the creator to improve the proof.
This is a simplified implementation of the MALT orchestrator. The original implementation trains the models with dpo and sft. This is a simplified implementation of the MALT orchestrator. The original implementation trains the models with dpo and sft.
@ -21,7 +21,6 @@ Whereas this implementation uses the models as is.
""" """
import concurrent.futures
from typing import List from typing import List
from loguru import logger from loguru import logger
@ -310,7 +309,7 @@ class MALT:
) )
self.conversation.add( self.conversation.add(
role=self.majority_voting_agent.agent_name, role=majority_voting_agent.agent_name,
content=majority_voting_verified, content=majority_voting_verified,
) )
@ -348,6 +347,7 @@ class MALT:
str or list or dict: The output from the conversation based on the specified return format. str or list or dict: The output from the conversation based on the specified return format.
""" """
task = task task = task
for i in range(self.max_loops): for i in range(self.max_loops):
logger.info(f"Starting iteration {i+1}/{self.max_loops}") logger.info(f"Starting iteration {i+1}/{self.max_loops}")
output = self.step(task, img, *args, **kwargs) output = self.step(task, img, *args, **kwargs)
@ -386,18 +386,3 @@ class MALT:
def __repr__(self): def __repr__(self):
return self.conversation.get_str() return self.conversation.get_str()
def run_concurrently(self, tasks: List[str], *args, **kwargs):
"""Executes a list of tasks using the main agent and processes the output through verifier and refiner agents.
Args:
tasks (list[str]): The list of tasks to be executed by the main agent.
"""
logger.info("Running batch of tasks concurrently.")
logger.info(f"Number of tasks: {len(tasks)}")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.run, task, *args, **kwargs)
for task in tasks
]
return concurrent.futures.as_completed(futures)

Loading…
Cancel
Save