You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
217 lines
7.3 KiB
217 lines
7.3 KiB
#!/usr/bin/env python3
|
|
"""
|
|
Simple command-line Q&A environment for testing with search functionality.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import random
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# Add project root to Python path
|
|
project_root = Path(__file__).resolve().parent.parent
|
|
sys.path.append(str(project_root))
|
|
|
|
# Import our search module and config
|
|
from src.config import DATA_DIR, logger
|
|
from src.search_module import get_question_answer, get_question_count, search
|
|
|
|
# TODO: Import verify function and router from appropriate module
|
|
# TODO: Consider moving verify function to search_module.py for better organization
|
|
|
|
|
|
class SimpleQAEnvironment:
|
|
"""Simple command-line environment for Q&A with search capability."""
|
|
|
|
def __init__(self):
|
|
self.score = {"correct": 0, "incorrect": 0, "total": 0}
|
|
self.session_data = []
|
|
self.current_question = None
|
|
self.session_file = DATA_DIR / "qa_sessions"
|
|
|
|
def display_welcome(self):
|
|
"""Display welcome message and instructions."""
|
|
logger.info("===== Search & Answer Environment =====")
|
|
logger.info(
|
|
"Answer questions using the search tool to find relevant information."
|
|
)
|
|
logger.info("Type 'q' to quit, 'h' for help.\n")
|
|
|
|
def display_help(self):
|
|
"""Display help information."""
|
|
logger.info("\n===== Commands =====")
|
|
logger.info("n - Get a new question")
|
|
logger.info("s <query> - Search for information (e.g., s program launch date)")
|
|
logger.info("a <answer> - Submit your answer")
|
|
logger.info("h - Display this help message")
|
|
logger.info("q - Quit the program\n")
|
|
|
|
def display_question(self, question: str):
|
|
"""Display the current question."""
|
|
logger.info("\n===== QUESTION =====")
|
|
logger.info(question)
|
|
logger.info("=====================\n")
|
|
|
|
def get_new_question(self) -> str:
|
|
"""Get a new random question and set it as current."""
|
|
total_questions = get_question_count()
|
|
question_id = random.randint(0, total_questions - 1)
|
|
|
|
# Updated to match new interface: get_question_answer now returns a dict.
|
|
qa = get_question_answer(question_id)
|
|
question = qa["question"]
|
|
correct_answer = qa["answer"]
|
|
|
|
question_data = {
|
|
"id": question_id,
|
|
"question": question,
|
|
"correct_answer": correct_answer,
|
|
"start_time": time.time(),
|
|
"searches": [],
|
|
}
|
|
self.current_question = question_data
|
|
return question
|
|
|
|
def perform_search(self, query: str):
|
|
"""Perform a search with the given query."""
|
|
if not query:
|
|
logger.warning("Please provide a search query.")
|
|
return
|
|
|
|
try:
|
|
logger.info("\n===== SEARCH RESULTS =====")
|
|
results = search(query)
|
|
logger.info(results)
|
|
logger.info("==========================\n")
|
|
|
|
# Record search in current question data if available.
|
|
if self.current_question is not None:
|
|
self.current_question["searches"].append(query)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching: {str(e)}")
|
|
|
|
async def process_answer(self, user_answer: str):
|
|
"""Process and verify the user's answer."""
|
|
if self.current_question is None:
|
|
logger.warning("Please get a question first.")
|
|
return
|
|
|
|
if not user_answer:
|
|
logger.warning("Please provide an answer.")
|
|
return
|
|
|
|
# Record answer and calculate time taken.
|
|
self.current_question["user_answer"] = user_answer
|
|
self.current_question["end_time"] = time.time()
|
|
self.current_question["time_taken"] = (
|
|
self.current_question["end_time"] - self.current_question["start_time"]
|
|
)
|
|
|
|
try:
|
|
logger.info("\nVerifying your answer...")
|
|
# TODO: Implement verify function in search_module.py
|
|
# correct = await verify(
|
|
# user_answer,
|
|
# self.current_question["question"],
|
|
# self.current_question["correct_answer"],
|
|
# router,
|
|
# )
|
|
correct = False # Temporary placeholder until verify is implemented
|
|
|
|
# Update score and inform the user.
|
|
self.score["total"] += 1
|
|
if correct:
|
|
self.score["correct"] += 1
|
|
logger.success("\n✓ Your answer is CORRECT!")
|
|
else:
|
|
self.score["incorrect"] += 1
|
|
logger.error("\n✗ Your answer is INCORRECT.")
|
|
logger.info(
|
|
f"\nThe correct answer is:\n{self.current_question['correct_answer']}"
|
|
)
|
|
|
|
logger.info(f"\nScore: {self.score['correct']}/{self.score['total']}")
|
|
|
|
# Record the result and add the current question to the session data.
|
|
self.current_question["is_correct"] = correct
|
|
self.session_data.append(self.current_question)
|
|
|
|
# Clear the current question.
|
|
self.current_question = None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error verifying answer: {str(e)}")
|
|
|
|
def save_session(self):
|
|
"""Save the session data to a file."""
|
|
if not self.session_data:
|
|
return
|
|
|
|
# Ensure session directory exists
|
|
self.session_file.mkdir(parents=True, exist_ok=True)
|
|
|
|
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
|
filename = self.session_file / f"qa_session_{timestamp}.json"
|
|
|
|
session_data = {
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"score": self.score,
|
|
"questions": self.session_data,
|
|
}
|
|
|
|
try:
|
|
with open(filename, "w") as f:
|
|
json.dump(session_data, f, indent=2)
|
|
logger.info(f"\nSession data saved to {filename}")
|
|
except Exception as e:
|
|
logger.error(f"Error saving session data: {str(e)}")
|
|
|
|
async def run(self):
|
|
"""Run the main command loop."""
|
|
self.display_welcome()
|
|
|
|
while True:
|
|
command = input("\n> ").strip()
|
|
|
|
if not command:
|
|
continue
|
|
|
|
# Process commands.
|
|
if command.lower() == "q":
|
|
break
|
|
elif command.lower() == "h":
|
|
self.display_help()
|
|
elif command.lower() == "n":
|
|
question = self.get_new_question()
|
|
self.display_question(question)
|
|
elif command.lower().startswith("s "):
|
|
query = command[2:].strip()
|
|
self.perform_search(query)
|
|
elif command.lower().startswith("a "):
|
|
answer = command[2:].strip()
|
|
await self.process_answer(answer)
|
|
else:
|
|
logger.warning("Unknown command. Type 'h' for help.")
|
|
|
|
# Save session data on exit.
|
|
self.save_session()
|
|
logger.info("\nThank you for using the Q&A environment!")
|
|
|
|
|
|
async def main():
|
|
"""Main function to start the application."""
|
|
env = SimpleQAEnvironment()
|
|
await env.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
logger.info("\nProgram terminated by user.")
|
|
except Exception as e:
|
|
logger.error(f"\nError: {str(e)}")
|