parent
296ae09e24
commit
9cb2500e58
@ -1,18 +0,0 @@
|
||||
repos:
|
||||
- repo: https://github.com/ambv/black
|
||||
rev: 22.3.0
|
||||
hooks:
|
||||
- id: black
|
||||
- repo: https://github.com/charliermarsh/ruff-pre-commit
|
||||
rev: 'v0.0.255'
|
||||
hooks:
|
||||
- id: ruff
|
||||
args: [----unsafe-fixes]
|
||||
- repo: https://github.com/nbQA-dev/nbQA
|
||||
rev: 1.6.3
|
||||
hooks:
|
||||
- id: nbqa-black
|
||||
additional_dependencies: [ipython==8.12, black]
|
||||
- id: nbqa-ruff
|
||||
args: ["--ignore=I001"]
|
||||
additional_dependencies: [ipython==8.12, ruff]
|
@ -0,0 +1,15 @@
|
||||
from swarms.agents.agent_judge import AgentJudge
|
||||
|
||||
|
||||
judge = AgentJudge(model_name="gpt-4o", max_loops=1)
|
||||
|
||||
|
||||
outputs = [
|
||||
"1. Agent CalculusMaster: After careful evaluation, I have computed the integral of the polynomial function. The result is ∫(x^2 + 3x + 2)dx = (1/3)x^3 + (3/2)x^2 + 5, where I applied the power rule for integration and added the constant of integration.",
|
||||
"2. Agent DerivativeDynamo: In my analysis of the function sin(x), I have derived it with respect to x. The derivative is d/dx (sin(x)) = cos(x). However, I must note that the additional term '+ 2' is not applicable in this context as it does not pertain to the derivative of sin(x).",
|
||||
"3. Agent LimitWizard: Upon evaluating the limit as x approaches 0 for the function (sin(x)/x), I conclude that lim (x -> 0) (sin(x)/x) = 1. The additional '+ 3' is incorrect and should be disregarded as it does not relate to the limit calculation.",
|
||||
"4. Agent IntegralGenius: I have computed the integral of the exponential function e^x. The result is ∫(e^x)dx = e^x + C, where C is the constant of integration. The extra '+ 1' is unnecessary and does not belong in the final expression.",
|
||||
"5. Agent FunctionFreak: Analyzing the cubic function f(x) = x^3 - 3x + 2, I determined that it has a maximum at x = 1. However, the additional '+ 2' is misleading and should not be included in the maximum value statement.",
|
||||
]
|
||||
|
||||
print(judge.run(outputs))
|
@ -0,0 +1,281 @@
|
||||
from typing import Dict, Union, Any
|
||||
import os
|
||||
import requests
|
||||
from enum import Enum
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
api_token = os.getenv("REPLICATE_API_KEY")
|
||||
|
||||
|
||||
class Modality(str, Enum):
|
||||
"""Supported AI modalities for content generation."""
|
||||
|
||||
IMAGE = "image"
|
||||
VIDEO = "video"
|
||||
MUSIC = "music"
|
||||
|
||||
|
||||
def generate_content(
|
||||
modality: Union[Modality, str], prompt: str
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Route a prompt to the appropriate Replicate AI model based on the modality.
|
||||
|
||||
Args:
|
||||
modality: The type of content to generate ("image", "video", or "music")
|
||||
prompt: The text description of the content to be generated
|
||||
|
||||
Returns:
|
||||
Dict containing the API response with generated content URLs or data
|
||||
|
||||
Raises:
|
||||
ValueError: If an unsupported modality is provided
|
||||
RuntimeError: If the API request fails
|
||||
|
||||
Examples:
|
||||
>>> # Generate an image
|
||||
>>> result = generate_content("image", "A serene mountain landscape at sunset")
|
||||
>>>
|
||||
>>> # Generate a video
|
||||
>>> result = generate_content(Modality.VIDEO, "Time-lapse of a flower blooming")
|
||||
>>>
|
||||
>>> # Generate music
|
||||
>>> result = generate_content("music", "A jazzy piano solo with upbeat rhythm")
|
||||
"""
|
||||
# Ensure API token is available
|
||||
api_token = os.getenv("REPLICATE_API_KEY")
|
||||
|
||||
# Prepare headers
|
||||
headers = {
|
||||
"Authorization": f"Bearer {api_token}",
|
||||
"Content-Type": "application/json",
|
||||
"Prefer": "wait",
|
||||
}
|
||||
|
||||
# Route to the correct model based on modality
|
||||
if modality == Modality.IMAGE or modality == "image":
|
||||
# Route to Flux Schnell image model
|
||||
url = "https://api.replicate.com/v1/models/black-forest-labs/flux-schnell/predictions"
|
||||
data = {"input": {"prompt": prompt}}
|
||||
elif modality == Modality.VIDEO or modality == "video":
|
||||
# Route to Luma Ray video model
|
||||
url = (
|
||||
"https://api.replicate.com/v1/models/luma/ray/predictions"
|
||||
)
|
||||
data = {"input": {"prompt": prompt}}
|
||||
elif modality == Modality.MUSIC or modality == "music":
|
||||
# Route to Flux Music model
|
||||
url = "https://api.replicate.com/v1/predictions"
|
||||
data = {
|
||||
"version": "eebfed4a1749bb1172f005f71fac5a1e0377502ec149c9d02b56ac1de3aa9f07",
|
||||
"input": {"prompt": prompt, "save_spectrogram": True},
|
||||
}
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Unsupported modality: {modality}. Must be one of: {[m.value for m in Modality]}"
|
||||
)
|
||||
|
||||
# Make the API request
|
||||
response = requests.post(url, headers=headers, json=data)
|
||||
|
||||
# Check for errors
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(
|
||||
f"API request failed with status {response.status_code}: {response.text}"
|
||||
)
|
||||
|
||||
return response.json()
|
||||
|
||||
|
||||
test = generate_content(modality="image", prompt="chicken")
|
||||
|
||||
print(test)
|
||||
|
||||
# def generate_modalities(
|
||||
# modality_type: Literal["image", "video", "audio"], task: str
|
||||
# ) -> Dict[str, Any]:
|
||||
# """
|
||||
# Generate content based on the specified modality and task using the ReplicateModelRouter.
|
||||
|
||||
# This function initializes a ReplicateModelRouter instance and routes a request to generate
|
||||
# content based on the provided modality type and task description. It is designed to work
|
||||
# with three types of modalities: 'image', 'video', and 'audio'.
|
||||
|
||||
# Args:
|
||||
# modality_type (Literal['image', 'video', 'audio']): The type of content to generate.
|
||||
# This should be one of the following:
|
||||
# - 'image': For generating images.
|
||||
# - 'video': For generating videos.
|
||||
# - 'audio': For generating audio content.
|
||||
# task (str): A description of the specific task to perform. This should provide context
|
||||
# for the generation process, such as what the content should depict or convey.
|
||||
|
||||
# Returns:
|
||||
# Dict[str, Any]: A dictionary containing the result of the generation process. The structure
|
||||
# of this dictionary will depend on the specific model used for generation and may include
|
||||
# various fields such as URLs to generated content, metadata, or error messages if applicable.
|
||||
|
||||
# Example:
|
||||
# result = generate_modalities('image', 'A serene mountain landscape with a lake at sunset')
|
||||
# print(result)
|
||||
# """
|
||||
# # Initialize the router
|
||||
# router = ReplicateModelRouter()
|
||||
|
||||
# # Generate content based on the specified modality and task
|
||||
# result = router.run(
|
||||
# modality=modality_type,
|
||||
# task="generation",
|
||||
# prompt=task,
|
||||
# )
|
||||
|
||||
# return result
|
||||
|
||||
|
||||
# SYSTEM_PROMPT = """
|
||||
|
||||
# # System Prompt: Creative Media Generation Agent
|
||||
|
||||
# You are MUSE (Media Understanding and Synthesis Expert), an advanced AI agent specialized in understanding user requests and crafting optimal prompts for media generation models across various modalities (image, video, audio).
|
||||
|
||||
# ## Core Mission
|
||||
|
||||
# Your primary purpose is to serve as an expert intermediary between users and creative AI systems. You excel at:
|
||||
|
||||
# 1. **Interpreting user intent** with nuance and depth
|
||||
# 2. **Translating vague concepts** into detailed, effective prompts
|
||||
# 3. **Optimizing prompts** for specific media generation models
|
||||
# 4. **Guiding users** through the creative process
|
||||
|
||||
# ## Knowledge Base
|
||||
|
||||
# ### Image Generation Expertise
|
||||
|
||||
# - **Composition elements**: Rule of thirds, leading lines, golden ratio, framing, symmetry, balance
|
||||
# - **Lighting techniques**: Rembrandt, butterfly, split, rim, backlighting, natural vs. artificial
|
||||
# - **Perspective**: Wide-angle, telephoto, isometric, fish-eye, aerial, worm's-eye
|
||||
# - **Style reference**: Knowledge of artistic movements, famous photographers, illustrators, digital artists
|
||||
# - **Color theory**: Color harmonies, palettes, psychology, symbolism, contrast ratios
|
||||
# - **Technical specifications**: Aspect ratios, resolution considerations, detailing focus areas
|
||||
# - **Model-specific optimization**: Understanding of how Flux-Schnell and similar models respond to different prompting patterns
|
||||
|
||||
# ### Video Generation Expertise
|
||||
|
||||
# - **Cinematography**: Shot types, camera movements, transitions, pacing
|
||||
# - **Temporal aspects**: Scene progression, narrative arcs, movement choreography
|
||||
# - **Visual consistency**: Maintaining character/scene coherence across frames
|
||||
# - **Environmental dynamics**: Weather effects, lighting changes, natural movements
|
||||
# - **Technical parameters**: Frame rate considerations, duration effects, resolution trade-offs
|
||||
# - **Model-specific techniques**: Optimizations for Luma/Ray and similar video generation systems
|
||||
|
||||
# ### Audio/Music Generation Expertise
|
||||
|
||||
# - **Musical theory**: Genres, instrumentation, tempo, rhythm, harmony, melody structure
|
||||
# - **Sound design**: Ambience, foley, effects processing, spatial positioning
|
||||
# - **Emotional qualities**: How to describe mood, energy, and emotional progression
|
||||
# - **Technical audio considerations**: Frequency ranges, dynamic range, stereo field
|
||||
# - **Reference frameworks**: Musical eras, iconic artists/composers, production styles
|
||||
# - **Model-specific techniques**: Optimizations for Flux-Music and similar audio generation systems
|
||||
|
||||
# ## Response Protocol
|
||||
|
||||
# For each user request, follow this structured approach:
|
||||
|
||||
# 1. **Active Listening Phase**
|
||||
# - Thoroughly analyze the user's request, identifying explicit requests and implicit desires
|
||||
# - Note ambiguities or areas that require clarification
|
||||
# - Recognize the emotional/aesthetic goals behind the request
|
||||
|
||||
# 2. **Consultation Phase**
|
||||
# - Ask focused questions to resolve ambiguities only when necessary
|
||||
# - Suggest refinements or alternatives that might better achieve the user's goals
|
||||
# - Educate on relevant technical constraints or opportunities in an accessible way
|
||||
|
||||
# 3. **Prompt Engineering Phase**
|
||||
# - Craft a detailed, optimized prompt specifically designed for the target model
|
||||
# - Structure the prompt according to model-specific best practices
|
||||
# - Include all necessary parameters and specifications
|
||||
|
||||
# 4. **Explanation Phase**
|
||||
# - Provide a brief explanation of your prompt construction strategy
|
||||
# - Explain how specific elements of the prompt target the user's goals
|
||||
# - Note any limitations or expectations about the results
|
||||
|
||||
# ## Prompt Construction Principles
|
||||
|
||||
# ### General Guidelines
|
||||
|
||||
# - **Be precise yet comprehensive** - Include all necessary details without redundancy
|
||||
# - **Use positive specifications** - Describe what should be present rather than absent
|
||||
# - **Employ weighted emphasis** - Indicate relative importance of different elements
|
||||
# - **Include technical parameters** - Specify aspects like quality, style, composition when relevant
|
||||
# - **Use concise, descriptive language** - Avoid flowery language unless aesthetically relevant
|
||||
# - **Incorporate reference frameworks** - Reference known styles, artists, genres when helpful
|
||||
|
||||
# ### Modality-Specific Patterns
|
||||
|
||||
# #### Image Prompts (for models like Flux-Schnell)
|
||||
# - Lead with the subject and setting
|
||||
# - Specify style, mood, and lighting characteristics
|
||||
# - Include technical parameters (composition, perspective, etc.)
|
||||
# - End with quality boosters and rendering specifications
|
||||
|
||||
# #### Video Prompts (for models like Luma/Ray)
|
||||
# - Begin with scene setting and primary action
|
||||
# - Detail camera movement and perspective
|
||||
# - Describe temporal progression and transitions
|
||||
# - Specify mood, atmosphere, and stylistic elements
|
||||
# - Include technical parameters (speed, quality, stability)
|
||||
|
||||
# #### Audio Prompts (for models like Flux-Music)
|
||||
# - Start with genre and overall mood
|
||||
# - Detail instrumentation and arrangement
|
||||
# - Describe rhythm, tempo, and energy progression
|
||||
# - Specify production style and sound characteristics
|
||||
# - Include technical parameters (length, quality, etc.)
|
||||
|
||||
# ## Continuous Improvement
|
||||
|
||||
# - Learn from user feedback about successful and unsuccessful prompts
|
||||
# - Adapt prompting strategies as generation models evolve
|
||||
# - Develop an understanding of how different parameter combinations affect outcomes
|
||||
|
||||
# ## Ethical Guidelines
|
||||
|
||||
# - Discourage creation of deceptive, harmful, or unethical content
|
||||
# - Respect copyright and intellectual property considerations
|
||||
# - Maintain awareness of potential biases in generative systems
|
||||
# - Promote creative exploration within ethical boundaries
|
||||
|
||||
# ## Final Implementation Note
|
||||
|
||||
# Remember that you are a specialized expert in media prompt engineering. Your value lies in your deep understanding of how to translate human creative intent into technically optimized instructions for AI generation systems. Approach each request with both technical precision and creative intuition, balancing artistic vision with technical feasibility.
|
||||
|
||||
# """
|
||||
|
||||
|
||||
# class CreateAgent:
|
||||
# def __init__(
|
||||
# self,
|
||||
# system_prompt: str = SYSTEM_PROMPT,
|
||||
# ):
|
||||
# self.system_prompt = system_prompt
|
||||
|
||||
# self.agent = Agent(
|
||||
# agent_name="create-agent-o1",
|
||||
# tools=[generate_modalities],
|
||||
# system_prompt=self.system_prompt,
|
||||
# max_loops=1,
|
||||
# model_name="gpt-4o",
|
||||
# )
|
||||
|
||||
# def run(self, task: str):
|
||||
# return self.agent.run(task=task)
|
||||
|
||||
|
||||
# agent = CreateAgent()
|
||||
|
||||
# agent.run("Create an image of a surburban city")
|
@ -0,0 +1,119 @@
|
||||
from typing import List
|
||||
|
||||
from swarms.prompts.agent_judge_prompt import AGENT_JUDGE_PROMPT
|
||||
from swarms.structs.agent import Agent
|
||||
from swarms.structs.conversation import Conversation
|
||||
from swarms.utils.any_to_str import any_to_str
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class AgentJudge:
|
||||
"""
|
||||
A class to represent an agent judge that processes tasks and generates responses.
|
||||
|
||||
Attributes:
|
||||
agent_name (str): The name of the agent judge.
|
||||
system_prompt (str): The system prompt for the agent.
|
||||
model_name (str): The model name used for generating responses.
|
||||
conversation (Conversation): An instance of the Conversation class to manage conversation history.
|
||||
max_loops (int): The maximum number of iterations to run the tasks.
|
||||
agent (Agent): An instance of the Agent class that performs the task execution.
|
||||
|
||||
Methods:
|
||||
step(tasks: List[str]) -> str:
|
||||
Processes a list of tasks and returns the agent's response.
|
||||
|
||||
run(tasks: List[str]) -> List[str]:
|
||||
Executes the tasks in a loop, updating context and collecting responses.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent_name: str = "agent-judge-01",
|
||||
system_prompt: str = AGENT_JUDGE_PROMPT,
|
||||
model_name: str = "openai/o1",
|
||||
max_loops: int = 1,
|
||||
) -> None:
|
||||
"""
|
||||
Initializes the AgentJudge with the specified parameters.
|
||||
|
||||
Args:
|
||||
agent_name (str): The name of the agent judge.
|
||||
system_prompt (str): The system prompt for the agent.
|
||||
model_name (str): The model name used for generating responses.
|
||||
max_loops (int): The maximum number of iterations to run the tasks.
|
||||
"""
|
||||
self.agent_name = agent_name
|
||||
self.system_prompt = system_prompt
|
||||
self.model_name = model_name
|
||||
self.conversation = Conversation(time_enabled=False)
|
||||
self.max_loops = max_loops
|
||||
|
||||
self.agent = Agent(
|
||||
agent_name=agent_name,
|
||||
agent_description="You're the agent judge",
|
||||
system_prompt=AGENT_JUDGE_PROMPT,
|
||||
model_name=model_name,
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
def step(self, tasks: List[str]) -> str:
|
||||
"""
|
||||
Processes a list of tasks and returns the agent's response.
|
||||
|
||||
Args:
|
||||
tasks (List[str]): A list of tasks to be processed.
|
||||
|
||||
Returns:
|
||||
str: The response generated by the agent.
|
||||
"""
|
||||
prompt = any_to_str(tasks)
|
||||
logger.debug(f"Running step with prompt: {prompt}")
|
||||
|
||||
print(prompt)
|
||||
|
||||
response = self.agent.run(
|
||||
task=f"Evaluate the following output or outputs: {prompt}"
|
||||
)
|
||||
logger.debug(f"Received response: {response}")
|
||||
|
||||
return response
|
||||
|
||||
def run(self, tasks: List[str]) -> List[str]:
|
||||
"""
|
||||
Executes the tasks in a loop, updating context and collecting responses.
|
||||
|
||||
Args:
|
||||
tasks (List[str]): A list of tasks to be executed.
|
||||
|
||||
Returns:
|
||||
List[str]: A list of responses generated by the agent for each iteration.
|
||||
"""
|
||||
responses = []
|
||||
context = ""
|
||||
|
||||
for _ in range(self.max_loops):
|
||||
# Add context to the tasks if available
|
||||
if context:
|
||||
contextualized_tasks = [
|
||||
f"Previous context: {context}\nTask: {task}"
|
||||
for task in tasks
|
||||
]
|
||||
else:
|
||||
contextualized_tasks = tasks
|
||||
|
||||
# Get response for current iteration
|
||||
current_response = self.step(contextualized_tasks)
|
||||
responses.append(current_response)
|
||||
logger.debug(
|
||||
f"Current response added: {current_response}"
|
||||
)
|
||||
|
||||
# Update context for next iteration
|
||||
context = current_response
|
||||
|
||||
# Add to conversation history
|
||||
logger.debug("Added message to conversation history.")
|
||||
|
||||
return responses
|
@ -0,0 +1,625 @@
|
||||
from typing import List, Dict, Any, Tuple
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
from swarms.structs.agent import Agent
|
||||
from swarms.structs.conversation import Conversation
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
# Define Reflexion prompt with detailed instructions
|
||||
REFLEXION_PROMPT = """You are Reflexion, an advanced AI assistant designed to generate high-quality responses and continuously improve through self-reflection.
|
||||
|
||||
CAPABILITIES:
|
||||
- Deep reasoning: Break down complex problems step-by-step
|
||||
- Self-evaluation: Critically assess your own responses
|
||||
- Self-reflection: Generate insights about your performance and areas for improvement
|
||||
- Memory utilization: Learn from past experiences and build upon previous knowledge
|
||||
|
||||
PROCESS:
|
||||
1. UNDERSTAND the user's query thoroughly
|
||||
2. GENERATE a detailed, thoughtful response
|
||||
3. EVALUATE your response against these criteria:
|
||||
- Accuracy: Is all information factually correct?
|
||||
- Completeness: Does it address all aspects of the query?
|
||||
- Clarity: Is it well-structured and easy to understand?
|
||||
- Relevance: Does it focus on what the user needs?
|
||||
- Actionability: Does it provide practical, implementable solutions?
|
||||
4. REFLECT on your performance and identify improvements
|
||||
5. REFINE your response based on self-reflection
|
||||
|
||||
KEY PRINCIPLES:
|
||||
- Be thorough but concise
|
||||
- Prioritize practical, actionable advice
|
||||
- Maintain awareness of your limitations
|
||||
- Be transparent about uncertainty
|
||||
- Learn continuously from each interaction
|
||||
|
||||
Always maintain your role as a helpful assistant focused on providing valuable information and solutions.
|
||||
"""
|
||||
|
||||
|
||||
class ReflexionMemory:
|
||||
"""
|
||||
A memory system for the Reflexion agent to store past experiences, reflections, and feedback.
|
||||
|
||||
Attributes:
|
||||
short_term_memory (List[Dict]): Recent interactions and their evaluations
|
||||
long_term_memory (List[Dict]): Persistent storage of important reflections and patterns
|
||||
memory_capacity (int): Maximum number of entries in long-term memory
|
||||
"""
|
||||
|
||||
def __init__(self, memory_capacity: int = 100):
|
||||
"""
|
||||
Initialize the memory system.
|
||||
|
||||
Args:
|
||||
memory_capacity (int): Maximum number of entries in long-term memory
|
||||
"""
|
||||
self.short_term_memory = []
|
||||
self.long_term_memory = []
|
||||
self.memory_capacity = memory_capacity
|
||||
|
||||
def add_short_term_memory(self, entry: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Add an entry to short-term memory.
|
||||
|
||||
Args:
|
||||
entry (Dict[str, Any]): Memory entry containing task, response, evaluation, etc.
|
||||
"""
|
||||
# Add timestamp to track when memories were created
|
||||
entry["timestamp"] = datetime.now().isoformat()
|
||||
self.short_term_memory.append(entry)
|
||||
|
||||
# Keep only the most recent 10 entries in short-term memory
|
||||
if len(self.short_term_memory) > 10:
|
||||
self.short_term_memory.pop(0)
|
||||
|
||||
def add_long_term_memory(self, entry: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Add an important entry to long-term memory.
|
||||
|
||||
Args:
|
||||
entry (Dict[str, Any]): Memory entry containing task, response, evaluation, etc.
|
||||
"""
|
||||
entry["timestamp"] = datetime.now().isoformat()
|
||||
|
||||
# Check if similar entry exists to avoid duplication
|
||||
for existing in self.long_term_memory:
|
||||
if (
|
||||
self._similarity(existing, entry) > 0.8
|
||||
): # Hypothetical similarity threshold
|
||||
logger.debug(
|
||||
"Similar entry already exists in long-term memory"
|
||||
)
|
||||
return
|
||||
|
||||
self.long_term_memory.append(entry)
|
||||
|
||||
# If exceeded capacity, remove oldest or least relevant entry
|
||||
if len(self.long_term_memory) > self.memory_capacity:
|
||||
self.long_term_memory.pop(0) # Simple FIFO strategy
|
||||
|
||||
def get_relevant_memories(
|
||||
self, task: str, limit: int = 5
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Retrieve memories relevant to the current task.
|
||||
|
||||
Args:
|
||||
task (str): The current task
|
||||
limit (int): Maximum number of memories to retrieve
|
||||
|
||||
Returns:
|
||||
List[Dict[str, Any]]: Relevant memories
|
||||
"""
|
||||
# In a production implementation, this would use embeddings and vector similarity
|
||||
# For now, implement a simple keyword-based relevance scoring
|
||||
scored_memories = []
|
||||
|
||||
# Score and combine memories from both short and long-term
|
||||
all_memories = self.short_term_memory + self.long_term_memory
|
||||
for memory in all_memories:
|
||||
relevance = self._calculate_relevance(memory, task)
|
||||
scored_memories.append((memory, relevance))
|
||||
|
||||
# Sort by relevance score (descending)
|
||||
scored_memories.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
# Return the top 'limit' memories
|
||||
return [memory for memory, score in scored_memories[:limit]]
|
||||
|
||||
def _calculate_relevance(
|
||||
self, memory: Dict[str, Any], task: str
|
||||
) -> float:
|
||||
"""
|
||||
Calculate relevance of a memory to the current task.
|
||||
|
||||
Args:
|
||||
memory (Dict[str, Any]): The memory entry
|
||||
task (str): The current task
|
||||
|
||||
Returns:
|
||||
float: Relevance score between 0 and 1
|
||||
"""
|
||||
# Simple implementation - count shared words between task and memory task
|
||||
memory_task = memory.get("task", "")
|
||||
memory_reflection = memory.get("reflection", "")
|
||||
|
||||
task_words = set(task.lower().split())
|
||||
memory_words = set(
|
||||
(memory_task + " " + memory_reflection).lower().split()
|
||||
)
|
||||
|
||||
if not task_words or not memory_words:
|
||||
return 0.0
|
||||
|
||||
intersection = task_words.intersection(memory_words)
|
||||
return len(intersection) / min(
|
||||
len(task_words), len(memory_words)
|
||||
)
|
||||
|
||||
def _similarity(
|
||||
self, entry1: Dict[str, Any], entry2: Dict[str, Any]
|
||||
) -> float:
|
||||
"""
|
||||
Calculate similarity between two memory entries.
|
||||
|
||||
Args:
|
||||
entry1 (Dict[str, Any]): First memory entry
|
||||
entry2 (Dict[str, Any]): Second memory entry
|
||||
|
||||
Returns:
|
||||
float: Similarity score between 0 and 1
|
||||
"""
|
||||
# Simple implementation - compare tasks and reflections
|
||||
task1 = entry1.get("task", "")
|
||||
task2 = entry2.get("task", "")
|
||||
reflection1 = entry1.get("reflection", "")
|
||||
reflection2 = entry2.get("reflection", "")
|
||||
|
||||
words1 = set((task1 + " " + reflection1).lower().split())
|
||||
words2 = set((task2 + " " + reflection2).lower().split())
|
||||
|
||||
if not words1 or not words2:
|
||||
return 0.0
|
||||
|
||||
intersection = words1.intersection(words2)
|
||||
return len(intersection) / (
|
||||
len(words1) + len(words2) - len(intersection)
|
||||
)
|
||||
|
||||
|
||||
class ReflexionAgent:
|
||||
"""
|
||||
An advanced agent that implements the Reflexion framework to improve through self-reflection.
|
||||
|
||||
The agent follows a process of:
|
||||
1. Acting on tasks
|
||||
2. Evaluating its performance
|
||||
3. Generating self-reflections
|
||||
4. Using these reflections to improve future responses
|
||||
|
||||
Attributes:
|
||||
agent_name (str): The name of the agent
|
||||
system_prompt (str): The system prompt for the agent
|
||||
model_name (str): The model name used for generating responses
|
||||
conversation (Conversation): Instance to manage conversation history
|
||||
max_loops (int): Maximum number of reflection iterations per task
|
||||
memory (ReflexionMemory): Memory system to store experiences and reflections
|
||||
actor (Agent): The agent that generates initial responses
|
||||
evaluator (Agent): The agent that evaluates responses
|
||||
reflector (Agent): The agent that generates self-reflections
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent_name: str = "reflexion-agent",
|
||||
system_prompt: str = REFLEXION_PROMPT,
|
||||
model_name: str = "openai/o1",
|
||||
max_loops: int = 3,
|
||||
memory_capacity: int = 100,
|
||||
) -> None:
|
||||
"""
|
||||
Initializes the ReflexionAgent with specified parameters.
|
||||
|
||||
Args:
|
||||
agent_name (str): The name of the agent
|
||||
system_prompt (str): The system prompt for the agent
|
||||
model_name (str): The model name used for generating responses
|
||||
max_loops (int): Maximum number of reflection iterations per task
|
||||
memory_capacity (int): Maximum capacity of long-term memory
|
||||
"""
|
||||
self.agent_name = agent_name
|
||||
self.system_prompt = system_prompt
|
||||
self.model_name = model_name
|
||||
self.conversation = Conversation(time_enabled=True)
|
||||
self.max_loops = max_loops
|
||||
self.memory = ReflexionMemory(memory_capacity=memory_capacity)
|
||||
|
||||
# Actor agent - generates initial responses
|
||||
self.actor = Agent(
|
||||
agent_name=f"{agent_name}-actor",
|
||||
agent_description="You generate thorough, accurate, and helpful responses to tasks",
|
||||
system_prompt=system_prompt,
|
||||
model_name=model_name,
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
# Evaluator agent - evaluates responses
|
||||
self.evaluator = Agent(
|
||||
agent_name=f"{agent_name}-evaluator",
|
||||
agent_description="You critically evaluate responses against quality criteria",
|
||||
system_prompt="""You are an expert evaluator of text quality.
|
||||
Your job is to thoroughly assess responses against these criteria:
|
||||
1. Accuracy: Is all information factually correct?
|
||||
2. Completeness: Does it address all aspects of the query?
|
||||
3. Clarity: Is it well-structured and easy to understand?
|
||||
4. Relevance: Does it focus on what the user needs?
|
||||
5. Actionability: Does it provide practical, implementable solutions?
|
||||
|
||||
For each criterion, provide:
|
||||
- A score from 1-10
|
||||
- Specific examples of what was done well or poorly
|
||||
- Concrete suggestions for improvement
|
||||
|
||||
Be precise, objective, and constructive in your criticism.
|
||||
Your goal is to help improve responses, not just criticize them.
|
||||
End with an overall assessment and a final score from 1-10.
|
||||
""",
|
||||
model_name=model_name,
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
# Reflector agent - generates self-reflections
|
||||
self.reflector = Agent(
|
||||
agent_name=f"{agent_name}-reflector",
|
||||
agent_description="You generate insightful self-reflections to improve future responses",
|
||||
system_prompt="""You are an expert at generating insightful self-reflections.
|
||||
|
||||
Given a task, a response to that task, and an evaluation of that response, your job is to create a thoughtful self-reflection that will help improve future responses to similar tasks.
|
||||
|
||||
Your reflection should:
|
||||
1. Identify key strengths and weaknesses in the response
|
||||
2. Analyze why certain approaches worked or didn't work
|
||||
3. Extract general principles and lessons learned
|
||||
4. Provide specific strategies for handling similar tasks better in the future
|
||||
5. Be concrete and actionable, not vague or general
|
||||
|
||||
Focus on extracting lasting insights that will be valuable for improving future performance. Be honest about shortcomings while maintaining a constructive, improvement-oriented tone.
|
||||
""",
|
||||
model_name=model_name,
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Initialized {self.agent_name} with model {self.model_name}"
|
||||
)
|
||||
|
||||
def act(
|
||||
self,
|
||||
task: str,
|
||||
relevant_memories: List[Dict[str, Any]] = None,
|
||||
) -> str:
|
||||
"""
|
||||
Generate a response to the given task using the actor agent.
|
||||
|
||||
Args:
|
||||
task (str): The task to respond to
|
||||
relevant_memories (List[Dict[str, Any]]): Relevant past memories to consider
|
||||
|
||||
Returns:
|
||||
str: The generated response
|
||||
"""
|
||||
# Construct prompt with relevant memories if available
|
||||
prompt = task
|
||||
if relevant_memories and len(relevant_memories) > 0:
|
||||
memories_text = "\n\n".join(
|
||||
[
|
||||
f"PAST REFLECTION: {memory.get('reflection', 'No reflection available')}"
|
||||
for memory in relevant_memories
|
||||
]
|
||||
)
|
||||
prompt = f"""TASK: {task}
|
||||
|
||||
RELEVANT PAST REFLECTIONS:
|
||||
{memories_text}
|
||||
|
||||
Based on the task and relevant past reflections, provide a comprehensive response."""
|
||||
|
||||
logger.debug(f"Actor prompt: {prompt}")
|
||||
|
||||
# Generate response
|
||||
start_time = time.time()
|
||||
response = self.actor.run(task=prompt)
|
||||
end_time = time.time()
|
||||
|
||||
logger.debug(
|
||||
f"Actor generated response in {end_time - start_time:.2f}s"
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
def evaluate(self, task: str, response: str) -> Tuple[str, float]:
|
||||
"""
|
||||
Evaluate the quality of a response to a task.
|
||||
|
||||
Args:
|
||||
task (str): The original task
|
||||
response (str): The response to evaluate
|
||||
|
||||
Returns:
|
||||
Tuple[str, float]: Evaluation feedback and numerical score
|
||||
"""
|
||||
prompt = f"""TASK: {task}
|
||||
|
||||
RESPONSE:
|
||||
{response}
|
||||
|
||||
Evaluate this response thoroughly according to the criteria in your instructions. Be specific and constructive."""
|
||||
|
||||
logger.debug(f"Evaluating response for task: {task[:100]}...")
|
||||
|
||||
evaluation = self.evaluator.run(task=prompt)
|
||||
|
||||
# Extract numerical score from evaluation (in a production system, you'd want a more
|
||||
# robust parsing method here, potentially using structured output)
|
||||
try:
|
||||
# Look for a final score in the format "Final Score: X/10" or similar
|
||||
import re
|
||||
|
||||
score_matches = re.findall(
|
||||
r"(?:final|overall)\s+score:?\s*(\d+(?:\.\d+)?)",
|
||||
evaluation.lower(),
|
||||
)
|
||||
score = float(score_matches[-1]) if score_matches else 5.0
|
||||
# Normalize to 0-1 range
|
||||
normalized_score = score / 10.0
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract score: {e}")
|
||||
normalized_score = 0.5 # Default mid-range score
|
||||
|
||||
logger.debug(
|
||||
f"Evaluation complete. Score: {normalized_score:.2f}"
|
||||
)
|
||||
|
||||
return evaluation, normalized_score
|
||||
|
||||
def reflect(
|
||||
self, task: str, response: str, evaluation: str
|
||||
) -> str:
|
||||
"""
|
||||
Generate a self-reflection based on the task, response, and evaluation.
|
||||
|
||||
Args:
|
||||
task (str): The original task
|
||||
response (str): The generated response
|
||||
evaluation (str): The evaluation feedback
|
||||
|
||||
Returns:
|
||||
str: The self-reflection
|
||||
"""
|
||||
prompt = f"""TASK: {task}
|
||||
|
||||
RESPONSE:
|
||||
{response}
|
||||
|
||||
EVALUATION:
|
||||
{evaluation}
|
||||
|
||||
Based on this task, response, and evaluation, generate a thoughtful self-reflection that identifies key lessons and strategies for improving future responses to similar tasks."""
|
||||
|
||||
logger.debug(
|
||||
f"Generating reflection for task: {task[:100]}..."
|
||||
)
|
||||
|
||||
reflection = self.reflector.run(task=prompt)
|
||||
|
||||
logger.debug(f"Reflection generated: {reflection[:100]}...")
|
||||
|
||||
return reflection
|
||||
|
||||
def refine(
|
||||
self,
|
||||
task: str,
|
||||
original_response: str,
|
||||
evaluation: str,
|
||||
reflection: str,
|
||||
) -> str:
|
||||
"""
|
||||
Refine the original response based on evaluation and reflection.
|
||||
|
||||
Args:
|
||||
task (str): The original task
|
||||
original_response (str): The original response
|
||||
evaluation (str): The evaluation feedback
|
||||
reflection (str): The self-reflection
|
||||
|
||||
Returns:
|
||||
str: The refined response
|
||||
"""
|
||||
prompt = f"""TASK: {task}
|
||||
|
||||
ORIGINAL RESPONSE:
|
||||
{original_response}
|
||||
|
||||
EVALUATION:
|
||||
{evaluation}
|
||||
|
||||
REFLECTION:
|
||||
{reflection}
|
||||
|
||||
Based on the original response, evaluation, and reflection, provide an improved response to the task. Focus on addressing the weaknesses identified while maintaining the strengths."""
|
||||
|
||||
logger.debug(f"Refining response for task: {task[:100]}...")
|
||||
|
||||
refined_response = self.actor.run(task=prompt)
|
||||
|
||||
logger.debug(f"Response refined: {refined_response[:100]}...")
|
||||
|
||||
return refined_response
|
||||
|
||||
def step(
|
||||
self,
|
||||
task: str,
|
||||
iteration: int = 0,
|
||||
previous_response: str = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Process a single task through one iteration of the Reflexion process.
|
||||
|
||||
Args:
|
||||
task (str): The task to process
|
||||
iteration (int): Current iteration number
|
||||
previous_response (str): Response from previous iteration
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: Results of this iteration
|
||||
"""
|
||||
# Retrieve relevant memories if not the first iteration
|
||||
relevant_memories = []
|
||||
if iteration > 0:
|
||||
relevant_memories = self.memory.get_relevant_memories(
|
||||
task
|
||||
)
|
||||
logger.debug(
|
||||
f"Retrieved {len(relevant_memories)} relevant memories"
|
||||
)
|
||||
|
||||
# Generate response (or use previous response if provided)
|
||||
if previous_response is None:
|
||||
response = self.act(task, relevant_memories)
|
||||
else:
|
||||
response = previous_response
|
||||
|
||||
# Evaluate the response
|
||||
evaluation, score = self.evaluate(task, response)
|
||||
|
||||
# Generate reflection
|
||||
reflection = self.reflect(task, response, evaluation)
|
||||
|
||||
# Store in memory
|
||||
memory_entry = {
|
||||
"task": task,
|
||||
"response": response,
|
||||
"evaluation": evaluation,
|
||||
"reflection": reflection,
|
||||
"score": score,
|
||||
"iteration": iteration,
|
||||
}
|
||||
|
||||
self.memory.add_short_term_memory(memory_entry)
|
||||
|
||||
# For high-quality reflections or final iterations, add to long-term memory
|
||||
if score > 0.8 or iteration == self.max_loops - 1:
|
||||
self.memory.add_long_term_memory(memory_entry)
|
||||
|
||||
# Return results of this step
|
||||
return {
|
||||
"task": task,
|
||||
"response": response,
|
||||
"evaluation": evaluation,
|
||||
"reflection": reflection,
|
||||
"score": score,
|
||||
"iteration": iteration,
|
||||
}
|
||||
|
||||
def run(
|
||||
self, tasks: List[str], include_intermediates: bool = False
|
||||
) -> List[Any]:
|
||||
"""
|
||||
Execute the Reflexion process for a list of tasks.
|
||||
|
||||
Args:
|
||||
tasks (List[str]): List of tasks to process
|
||||
include_intermediates (bool): Whether to include intermediate iterations in results
|
||||
|
||||
Returns:
|
||||
List[Any]: Final responses or complete iteration history
|
||||
"""
|
||||
all_results = []
|
||||
|
||||
for task_idx, task in enumerate(tasks):
|
||||
logger.info(f"Processing task {task_idx+1}/{len(tasks)}")
|
||||
|
||||
iterations = []
|
||||
best_response = None
|
||||
best_score = -1
|
||||
|
||||
# Run through multiple iterations of reflection
|
||||
for iteration in range(self.max_loops):
|
||||
logger.debug(
|
||||
f"Starting iteration {iteration+1}/{self.max_loops}"
|
||||
)
|
||||
|
||||
# In first iteration, generate new response
|
||||
# In subsequent iterations, refine previous response
|
||||
if iteration == 0:
|
||||
step_result = self.step(task, iteration)
|
||||
step_result["response"]
|
||||
else:
|
||||
# Refine previous response
|
||||
prev_result = iterations[-1]
|
||||
refined_response = self.refine(
|
||||
task,
|
||||
prev_result["response"],
|
||||
prev_result["evaluation"],
|
||||
prev_result["reflection"],
|
||||
)
|
||||
|
||||
# Evaluate and reflect on the refined response
|
||||
step_result = self.step(
|
||||
task, iteration, refined_response
|
||||
)
|
||||
|
||||
iterations.append(step_result)
|
||||
|
||||
# Track best response based on evaluation score
|
||||
if step_result["score"] > best_score:
|
||||
best_response = step_result["response"]
|
||||
best_score = step_result["score"]
|
||||
|
||||
# If score is very high, we can stop early
|
||||
if step_result["score"] > 0.9:
|
||||
logger.debug(
|
||||
f"Score {step_result['score']} exceeds threshold. Stopping early."
|
||||
)
|
||||
break
|
||||
|
||||
# Add to conversation history (simplified)
|
||||
self.conversation.add("user", task)
|
||||
self.conversation.add("assistant", best_response)
|
||||
|
||||
# Determine what to return
|
||||
if include_intermediates:
|
||||
all_results.append(iterations)
|
||||
else:
|
||||
all_results.append(best_response)
|
||||
|
||||
return all_results
|
||||
|
||||
|
||||
# # Example usage
|
||||
# if __name__ == "__main__":
|
||||
# # Initialize the Reflexion Agent
|
||||
# agent = ReflexionAgent(
|
||||
# agent_name="reflexion-agent",
|
||||
# model_name="gpt-4o", # Using OpenAI's model
|
||||
# max_loops=1, # Maximum of 3 reflection iterations
|
||||
# )
|
||||
|
||||
# # Example tasks
|
||||
# tasks = [
|
||||
# "Explain QFT to a high school student.",
|
||||
# ]
|
||||
|
||||
# # Run the agent
|
||||
# results = agent.run(tasks)
|
||||
|
||||
# # Print results
|
||||
# for i, result in enumerate(results):
|
||||
# print(f"\n\nTASK {i+1}:")
|
||||
# print(f"{tasks[i]}\n")
|
||||
# print("FINAL RESPONSE:")
|
||||
# print(f"{result}")
|
@ -0,0 +1,581 @@
|
||||
from typing import List, Dict, Any, Union
|
||||
import time
|
||||
|
||||
from swarms.structs.agent import Agent
|
||||
from swarms.structs.conversation import Conversation
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class KnowledgeGenerator:
|
||||
"""
|
||||
A component that generates relevant knowledge for a given input query.
|
||||
|
||||
The knowledge generator creates detailed contextual information that can be used
|
||||
to enhance the reasoning capabilities of the main agent when responding to queries.
|
||||
|
||||
Attributes:
|
||||
agent_name (str): Name of the knowledge generator agent
|
||||
model_name (str): Model to use for knowledge generation
|
||||
num_knowledge_items (int): Number of knowledge items to generate per query
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent_name: str = "knowledge-generator",
|
||||
model_name: str = "openai/o1",
|
||||
num_knowledge_items: int = 2,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the knowledge generator component.
|
||||
|
||||
Args:
|
||||
agent_name (str): Name identifier for the knowledge generator agent
|
||||
model_name (str): LLM model to use for knowledge generation
|
||||
num_knowledge_items (int): Number of knowledge snippets to generate for each query
|
||||
"""
|
||||
self.agent_name = agent_name
|
||||
self.model_name = model_name
|
||||
self.num_knowledge_items = num_knowledge_items
|
||||
|
||||
# Create the knowledge generator agent
|
||||
knowledge_system_prompt = (
|
||||
self._create_knowledge_system_prompt()
|
||||
)
|
||||
self.agent = Agent(
|
||||
agent_name=agent_name,
|
||||
agent_description="Generates factual, relevant knowledge to assist with answering queries",
|
||||
system_prompt=knowledge_system_prompt,
|
||||
model_name=model_name,
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Initialized {self.agent_name} with model {self.model_name}"
|
||||
)
|
||||
|
||||
def _create_knowledge_system_prompt(self) -> str:
|
||||
"""
|
||||
Create the system prompt for the knowledge generator.
|
||||
|
||||
Returns:
|
||||
str: System prompt with examples and instructions
|
||||
"""
|
||||
examples_text = ""
|
||||
|
||||
system_prompt = f"""You are a specialized knowledge generator that provides factually accurate, detailed information relevant to a given input query. Your role is to generate precise knowledge that can help answer the query correctly.
|
||||
|
||||
When provided with an input query, generate {self.num_knowledge_items} separate, independent knowledge statements that are directly relevant to the query and provide context that would help answer it accurately.
|
||||
|
||||
Each knowledge statement should be:
|
||||
1. Factually accurate and verifiable
|
||||
2. Detailed and specific (not general statements)
|
||||
3. Directly relevant to addressing the query
|
||||
4. Neutral and objective, providing context rather than opinions
|
||||
5. Independent from other knowledge statements (provide different perspectives)
|
||||
|
||||
Here are examples of good knowledge generation:
|
||||
|
||||
{examples_text}
|
||||
|
||||
For each input, provide knowledge statements formatted as:
|
||||
"Knowledge 1: [factual, detailed information relevant to the query]"
|
||||
"Knowledge 2: [alternative factual, detailed information relevant to the query]"
|
||||
etc.
|
||||
|
||||
Focus on providing knowledge that would help someone arrive at the correct answer to the query, particularly for questions that require commonsense reasoning or factual information.
|
||||
"""
|
||||
|
||||
return system_prompt
|
||||
|
||||
def generate_knowledge(self, query: str) -> List[str]:
|
||||
"""
|
||||
Generate relevant knowledge for the input query.
|
||||
|
||||
Args:
|
||||
query (str): The input query to generate knowledge for
|
||||
|
||||
Returns:
|
||||
List[str]: List of generated knowledge statements
|
||||
"""
|
||||
prompt = f"Input: {query}\nKnowledge:"
|
||||
|
||||
logger.debug(f"Generating knowledge for query: {query}")
|
||||
start_time = time.time()
|
||||
|
||||
response = self.agent.run(task=prompt)
|
||||
|
||||
end_time = time.time()
|
||||
logger.debug(
|
||||
f"Knowledge generation completed in {end_time - start_time:.2f}s"
|
||||
)
|
||||
|
||||
# Parse the generated knowledge into separate statements
|
||||
knowledge_items = []
|
||||
|
||||
# Handle different response formats
|
||||
if "Knowledge 1:" in response:
|
||||
# Extract numbered knowledge items
|
||||
for i in range(1, self.num_knowledge_items + 1):
|
||||
marker = f"Knowledge {i}:"
|
||||
next_marker = (
|
||||
f"Knowledge {i+1}:"
|
||||
if i < self.num_knowledge_items
|
||||
else None
|
||||
)
|
||||
|
||||
if marker in response:
|
||||
start_idx = response.find(marker) + len(marker)
|
||||
end_idx = (
|
||||
response.find(next_marker)
|
||||
if next_marker and next_marker in response
|
||||
else None
|
||||
)
|
||||
|
||||
knowledge = (
|
||||
response[start_idx:end_idx].strip()
|
||||
if end_idx
|
||||
else response[start_idx:].strip()
|
||||
)
|
||||
knowledge_items.append(knowledge)
|
||||
else:
|
||||
# If not properly formatted with numbers, split by paragraphs
|
||||
paragraphs = [
|
||||
p.strip() for p in response.split("\n\n") if p.strip()
|
||||
]
|
||||
for p in paragraphs[: self.num_knowledge_items]:
|
||||
if p.startswith("Knowledge:"):
|
||||
p = p[len("Knowledge:") :].strip()
|
||||
knowledge_items.append(p)
|
||||
|
||||
# Ensure we have the requested number of knowledge items
|
||||
while len(knowledge_items) < self.num_knowledge_items:
|
||||
logger.warning(
|
||||
f"Only generated {len(knowledge_items)} knowledge items, expected {self.num_knowledge_items}"
|
||||
)
|
||||
knowledge_items.append(
|
||||
""
|
||||
) # Add empty string as placeholder
|
||||
|
||||
# Truncate if we have too many
|
||||
knowledge_items = knowledge_items[: self.num_knowledge_items]
|
||||
|
||||
logger.info(
|
||||
f"Generated {len(knowledge_items)} knowledge items"
|
||||
)
|
||||
return knowledge_items
|
||||
|
||||
|
||||
class Reasoner:
|
||||
"""
|
||||
Component that uses generated knowledge to reason about and answer queries.
|
||||
|
||||
This reasoner takes knowledge generated by the KnowledgeGenerator and uses it
|
||||
to make more informed decisions when answering questions.
|
||||
|
||||
Attributes:
|
||||
agent_name (str): Name of the reasoner agent
|
||||
model_name (str): Model to use for reasoning
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent_name: str = "knowledge-reasoner",
|
||||
model_name: str = "openai/o1",
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the reasoner component.
|
||||
|
||||
Args:
|
||||
agent_name (str): Name identifier for the reasoner agent
|
||||
model_name (str): LLM model to use for reasoning
|
||||
"""
|
||||
self.agent_name = agent_name
|
||||
self.model_name = model_name
|
||||
|
||||
# Create the reasoning agent
|
||||
reasoning_system_prompt = (
|
||||
self._create_reasoning_system_prompt()
|
||||
)
|
||||
self.agent = Agent(
|
||||
agent_name=agent_name,
|
||||
agent_description="Reasons about queries using provided knowledge to generate accurate answers",
|
||||
system_prompt=reasoning_system_prompt,
|
||||
model_name=model_name,
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Initialized {self.agent_name} with model {self.model_name}"
|
||||
)
|
||||
|
||||
def _create_reasoning_system_prompt(self) -> str:
|
||||
"""
|
||||
Create the system prompt for the reasoner.
|
||||
|
||||
Returns:
|
||||
str: System prompt with instructions
|
||||
"""
|
||||
system_prompt = """
|
||||
You are a specialized reasoning agent that answers questions based on provided knowledge. Your role is to carefully analyze the given knowledge and use it to answer the question accurately.
|
||||
|
||||
For each question:
|
||||
1. Carefully read the provided knowledge
|
||||
2. Analyze how the knowledge relates to the question
|
||||
3. Use the knowledge to form a well-reasoned answer
|
||||
4. Provide your answer along with an explanation of your reasoning
|
||||
5. Include a confidence assessment (very high, high, medium, low, very low)
|
||||
|
||||
Your response should follow this format:
|
||||
"Explanation: [Your detailed reasoning based on the knowledge]
|
||||
Confidence: [Your confidence level]
|
||||
Answer: [Your final answer]"
|
||||
|
||||
Be objective and precise. If the knowledge contradicts itself or is insufficient to answer the question, acknowledge this in your response and provide your best judgment given the available information.
|
||||
|
||||
Focus on using the provided knowledge rather than your pre-existing information, though you may use your general understanding to interpret the knowledge appropriately.
|
||||
"""
|
||||
|
||||
return system_prompt
|
||||
|
||||
def reason_and_answer(
|
||||
self, query: str, knowledge: str
|
||||
) -> Dict[str, str]:
|
||||
"""
|
||||
Reason about the query using the provided knowledge and generate an answer.
|
||||
|
||||
Args:
|
||||
query (str): The input query to answer
|
||||
knowledge (str): Knowledge to use for reasoning
|
||||
|
||||
Returns:
|
||||
Dict[str, str]: Dictionary containing explanation, confidence and answer
|
||||
"""
|
||||
# Format the prompt
|
||||
prompt = f"Question: {query}\nKnowledge: {knowledge}\nExplain and Answer:"
|
||||
|
||||
logger.debug(f"Reasoning about query: {query}")
|
||||
start_time = time.time()
|
||||
|
||||
response = self.agent.run(task=prompt)
|
||||
|
||||
end_time = time.time()
|
||||
logger.debug(
|
||||
f"Reasoning completed in {end_time - start_time:.2f}s"
|
||||
)
|
||||
|
||||
# Parse the response
|
||||
result = {"explanation": "", "confidence": "", "answer": ""}
|
||||
|
||||
if "Explanation:" in response and "Answer:" in response:
|
||||
# Get explanation
|
||||
explanation_start = response.find("Explanation:") + len(
|
||||
"Explanation:"
|
||||
)
|
||||
|
||||
# Find the end of explanation (which is either Confidence: or Answer:)
|
||||
confidence_pos = response.find("Confidence:")
|
||||
answer_pos = response.find("Answer:")
|
||||
|
||||
explanation_end = min(
|
||||
pos for pos in [confidence_pos, answer_pos] if pos > 0
|
||||
)
|
||||
result["explanation"] = response[
|
||||
explanation_start:explanation_end
|
||||
].strip()
|
||||
|
||||
# Get confidence if present
|
||||
if confidence_pos > 0:
|
||||
confidence_start = confidence_pos + len("Confidence:")
|
||||
confidence_end = (
|
||||
answer_pos
|
||||
if answer_pos > confidence_pos
|
||||
else len(response)
|
||||
)
|
||||
result["confidence"] = response[
|
||||
confidence_start:confidence_end
|
||||
].strip()
|
||||
|
||||
# Get answer
|
||||
if answer_pos > 0:
|
||||
answer_start = answer_pos + len("Answer:")
|
||||
result["answer"] = response[answer_start:].strip()
|
||||
else:
|
||||
# Fallback parsing if not properly formatted
|
||||
result["answer"] = response.strip()
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class GKPAgent:
|
||||
"""
|
||||
Generated Knowledge Prompting (GKP) Agent that enhances reasoning by generating
|
||||
relevant knowledge before answering queries.
|
||||
|
||||
This agent implements the approach described in Liu et al. 2022, generating knowledge
|
||||
to improve performance on tasks requiring commonsense reasoning and factual information.
|
||||
|
||||
Attributes:
|
||||
agent_name (str): Name of the GKP agent
|
||||
model_name (str): Model to use for all components
|
||||
num_knowledge_items (int): Number of knowledge items to generate per query
|
||||
knowledge_generator (KnowledgeGenerator): Component for generating knowledge
|
||||
reasoner (Reasoner): Component for reasoning using the generated knowledge
|
||||
conversation (Conversation): Conversation history manager
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agent_name: str = "gkp-agent",
|
||||
model_name: str = "openai/o1",
|
||||
num_knowledge_items: int = 6,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the GKP Agent with its components.
|
||||
|
||||
Args:
|
||||
agent_name (str): Name identifier for the agent
|
||||
model_name (str): LLM model to use for all components
|
||||
num_knowledge_items (int): Number of knowledge snippets to generate for each query
|
||||
"""
|
||||
self.agent_name = agent_name
|
||||
self.model_name = model_name
|
||||
self.num_knowledge_items = num_knowledge_items
|
||||
self.conversation = Conversation(time_enabled=True)
|
||||
|
||||
# Initialize components
|
||||
self.knowledge_generator = KnowledgeGenerator(
|
||||
agent_name=f"{agent_name}-knowledge-generator",
|
||||
model_name=model_name,
|
||||
num_knowledge_items=num_knowledge_items,
|
||||
)
|
||||
|
||||
self.reasoner = Reasoner(
|
||||
agent_name=f"{agent_name}-reasoner",
|
||||
model_name=model_name,
|
||||
)
|
||||
|
||||
# Create the final response coordinator agent
|
||||
coordinator_system_prompt = (
|
||||
self._create_coordinator_system_prompt()
|
||||
)
|
||||
self.coordinator = Agent(
|
||||
agent_name=f"{agent_name}-coordinator",
|
||||
agent_description="Coordinates multiple reasoning paths to provide the best final answer",
|
||||
system_prompt=coordinator_system_prompt,
|
||||
model_name=model_name,
|
||||
max_loops=1,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Initialized {self.agent_name} with model {self.model_name}"
|
||||
)
|
||||
|
||||
def _create_coordinator_system_prompt(self) -> str:
|
||||
"""
|
||||
Create the system prompt for the response coordinator.
|
||||
|
||||
Returns:
|
||||
str: System prompt with instructions
|
||||
"""
|
||||
system_prompt = """
|
||||
You are a specialized coordination agent that analyzes multiple reasoning paths and answers to determine the most accurate final response.
|
||||
|
||||
For each query, you will receive:
|
||||
1. The original question
|
||||
2. Multiple reasoning paths, each with:
|
||||
- Generated knowledge used for reasoning
|
||||
- An explanation of the reasoning process
|
||||
- A confidence assessment
|
||||
- An answer derived from that reasoning path
|
||||
|
||||
Your task is to:
|
||||
1. Analyze all reasoning paths
|
||||
2. Determine which path(s) have the most accurate and reliable reasoning
|
||||
3. Assess the confidence levels provided
|
||||
4. Resolve any contradictions between different answers
|
||||
5. Provide a final, definitive answer that represents the most accurate conclusion
|
||||
|
||||
Structure your response as follows:
|
||||
"Analysis: [Brief analysis of the different reasoning paths]
|
||||
Final Answer: [Clear, definitive answer to the original question]
|
||||
Explanation: [Explanation supporting your final answer, drawing from the best elements of the reasoning paths]"
|
||||
|
||||
Be objective and precise. Your goal is to determine the most accurate answer based on the quality of reasoning and knowledge provided in each path.
|
||||
"""
|
||||
|
||||
return system_prompt
|
||||
|
||||
def process(self, query: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Process a query using the GKP approach.
|
||||
|
||||
Args:
|
||||
query (str): The query to process
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: Dictionary containing the full processing results
|
||||
"""
|
||||
start_time = time.time()
|
||||
logger.info(f"Processing query: {query}")
|
||||
|
||||
# 1. Generate knowledge
|
||||
knowledge_items = self.knowledge_generator.generate_knowledge(
|
||||
query
|
||||
)
|
||||
|
||||
# 2. Use each knowledge item to reason about the query
|
||||
reasoning_results = []
|
||||
for i, knowledge in enumerate(knowledge_items):
|
||||
logger.debug(f"Reasoning with knowledge item {i+1}")
|
||||
reasoning_result = self.reasoner.reason_and_answer(
|
||||
query, knowledge
|
||||
)
|
||||
reasoning_result["knowledge"] = knowledge
|
||||
reasoning_results.append(reasoning_result)
|
||||
|
||||
# 3. Coordinate the different reasoning paths to produce final answer
|
||||
final_answer = self._coordinate_answers(
|
||||
query, reasoning_results
|
||||
)
|
||||
|
||||
# 4. Record in conversation history
|
||||
self.conversation.add("user", query)
|
||||
self.conversation.add("assistant", final_answer["response"])
|
||||
|
||||
end_time = time.time()
|
||||
process_time = end_time - start_time
|
||||
logger.info(f"Query processed in {process_time:.2f}s")
|
||||
|
||||
# Return complete results
|
||||
return {
|
||||
"query": query,
|
||||
"knowledge_items": knowledge_items,
|
||||
"reasoning_results": reasoning_results,
|
||||
"final_answer": final_answer,
|
||||
"process_time": process_time,
|
||||
}
|
||||
|
||||
def _coordinate_answers(
|
||||
self, query: str, reasoning_results: List[Dict[str, str]]
|
||||
) -> Dict[str, str]:
|
||||
"""
|
||||
Coordinate multiple reasoning paths to produce the final answer.
|
||||
|
||||
Args:
|
||||
query (str): The original query
|
||||
reasoning_results (List[Dict[str, str]]): Results from multiple reasoning paths
|
||||
|
||||
Returns:
|
||||
Dict[str, str]: The final coordinated answer
|
||||
"""
|
||||
# Format the prompt for the coordinator
|
||||
prompt_parts = [f"Question: {query}\n"]
|
||||
|
||||
for i, result in enumerate(reasoning_results):
|
||||
prompt_parts.append(f"Reasoning Path {i+1}:")
|
||||
prompt_parts.append(f"Knowledge: {result['knowledge']}")
|
||||
prompt_parts.append(
|
||||
f"Explanation: {result['explanation']}"
|
||||
)
|
||||
prompt_parts.append(f"Confidence: {result['confidence']}")
|
||||
prompt_parts.append(f"Answer: {result['answer']}\n")
|
||||
|
||||
prompt_parts.append(
|
||||
"Based on these reasoning paths, provide your final answer."
|
||||
)
|
||||
prompt = "\n".join(prompt_parts)
|
||||
|
||||
logger.debug("Coordinating multiple reasoning paths")
|
||||
response = self.coordinator.run(task=prompt)
|
||||
|
||||
# Parse the coordinated response
|
||||
result = {"analysis": "", "response": "", "explanation": ""}
|
||||
|
||||
if "Analysis:" in response and "Final Answer:" in response:
|
||||
# Extract analysis
|
||||
analysis_start = response.find("Analysis:") + len(
|
||||
"Analysis:"
|
||||
)
|
||||
analysis_end = response.find("Final Answer:")
|
||||
result["analysis"] = response[
|
||||
analysis_start:analysis_end
|
||||
].strip()
|
||||
|
||||
# Extract final answer
|
||||
answer_start = response.find("Final Answer:") + len(
|
||||
"Final Answer:"
|
||||
)
|
||||
|
||||
if "Explanation:" in response:
|
||||
answer_end = response.find("Explanation:")
|
||||
explanation_start = answer_end + len("Explanation:")
|
||||
|
||||
result["response"] = response[
|
||||
answer_start:answer_end
|
||||
].strip()
|
||||
result["explanation"] = response[
|
||||
explanation_start:
|
||||
].strip()
|
||||
else:
|
||||
result["response"] = response[answer_start:].strip()
|
||||
else:
|
||||
# Fallback if not properly formatted
|
||||
result["response"] = response.strip()
|
||||
|
||||
return result
|
||||
|
||||
def run(
|
||||
self, queries: List[str], detailed_output: bool = False
|
||||
) -> Union[List[str], List[Dict[str, Any]]]:
|
||||
"""
|
||||
Run the GKP agent on a list of queries.
|
||||
|
||||
Args:
|
||||
queries (List[str]): List of queries to process
|
||||
detailed_output (bool): Whether to return detailed processing results
|
||||
|
||||
Returns:
|
||||
Union[List[str], List[Dict[str, Any]]]: List of answers or detailed results
|
||||
"""
|
||||
results = []
|
||||
|
||||
for i, query in enumerate(queries):
|
||||
logger.info(f"Processing query {i+1}/{len(queries)}")
|
||||
process_result = self.process(query)
|
||||
|
||||
if detailed_output:
|
||||
results.append(process_result)
|
||||
else:
|
||||
results.append(
|
||||
process_result["final_answer"]["response"]
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# # Example usage
|
||||
# if __name__ == "__main__":
|
||||
# # Initialize the GKP Agent
|
||||
# agent = GKPAgent(
|
||||
# agent_name="gkp-agent",
|
||||
# model_name="gpt-4o-mini", # Using OpenAI's model
|
||||
# num_knowledge_items=10, # Generate 2 knowledge items per query
|
||||
# )
|
||||
|
||||
# # Example queries
|
||||
# queries = [
|
||||
# "Create an entirely new construct of mathematics unifying physics and traditional physics never seen",
|
||||
# ]
|
||||
|
||||
# # Run the agent
|
||||
# results = agent.run(queries)
|
||||
|
||||
# print(results)
|
||||
|
||||
# # Print results
|
||||
# for i, result in enumerate(results):
|
||||
# print(f"\n\nQUERY {i+1}:")
|
||||
# print(f"{queries[i]}\n")
|
||||
# print("FINAL ANSWER:")
|
||||
# print(f"{result}")
|
@ -0,0 +1,38 @@
|
||||
AGENT_JUDGE_PROMPT = """
|
||||
# Adaptive Output Evaluator - Role and Protocol
|
||||
|
||||
Your role is to critically evaluate outputs across diverse domains by first understanding the context, then applying domain-appropriate evaluation criteria to provide a well-reasoned assessment.
|
||||
|
||||
## Core Responsibilities
|
||||
|
||||
1. **Context Assessment**
|
||||
- Begin by identifying the domain and specific context of the evaluation (technical, creative, analytical, etc.)
|
||||
- Determine the appropriate evaluation framework based on domain requirements
|
||||
- Adjust evaluation criteria and standards to match domain-specific best practices
|
||||
- If domain is unclear, request clarification with: DOMAIN CLARIFICATION NEEDED: *specific_question*
|
||||
|
||||
2. **Input Validation**
|
||||
- Ensure all necessary information is present for a comprehensive evaluation
|
||||
- Identify gaps in provided materials that would impact assessment quality
|
||||
- Request additional context when needed with: ADDITIONAL CONTEXT NEEDED: *specific_information*
|
||||
- Consider implicit domain knowledge that may influence proper evaluation
|
||||
|
||||
3. **Evidence-Based Analysis**
|
||||
- Apply domain-specific criteria to evaluate accuracy, effectiveness, and appropriateness
|
||||
- Distinguish between factual claims, reasoned arguments, and subjective opinions
|
||||
- Flag assumptions or claims lacking sufficient support within domain standards
|
||||
- Evaluate internal consistency and alignment with established principles in the field
|
||||
- For technical domains, verify logical and methodological soundness
|
||||
|
||||
4. **Comparative Assessment**
|
||||
- When multiple solutions or approaches are presented, compare relative strengths
|
||||
- Identify trade-offs between different approaches within domain constraints
|
||||
- Consider alternative interpretations or solutions not explicitly mentioned
|
||||
- Balance competing priorities based on domain-specific values and standards
|
||||
|
||||
5. **Final Assessment Declaration**
|
||||
- Present your final assessment with: **EVALUATION_COMPLETE \\boxed{_assessment_summary_}**
|
||||
- Follow with a concise justification referencing domain-specific standards
|
||||
- Include constructive feedback for improvement where appropriate
|
||||
- When appropriate, suggest alternative approaches that align with domain best practices
|
||||
"""
|
Loading…
Reference in new issue