pull/634/head
Your Name 2 months ago
parent 2b4be2bef7
commit 4a9a0ba3ef

@ -1,141 +0,0 @@
from typing import Any, List, Optional, Union
from pathlib import Path
from loguru import logger
from doc_master import doc_master
from concurrent.futures import ThreadPoolExecutor, as_completed
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def _process_document(doc_path: Union[str, Path]) -> str:
"""Safely process a single document with retries.
Args:
doc_path: Path to the document to process
Returns:
Processed document text
Raises:
Exception: If document processing fails after retries
"""
try:
return doc_master(
file_path=str(doc_path), output_type="string"
)
except Exception as e:
logger.error(
f"Error processing document {doc_path}: {str(e)}"
)
raise
def handle_input_docs(
agents: Any,
docs: Optional[List[Union[str, Path]]] = None,
doc_folder: Optional[Union[str, Path]] = None,
max_workers: int = 4,
chunk_size: int = 1000000,
) -> Any:
"""
Add document content to agent prompts with improved reliability and performance.
Args:
agents: Dictionary mapping agent names to Agent objects
docs: List of document paths
doc_folder: Path to folder containing documents
max_workers: Maximum number of parallel document processing workers
chunk_size: Maximum characters to process at once to avoid memory issues
Raises:
ValueError: If neither docs nor doc_folder is provided
RuntimeError: If document processing fails
"""
if not agents:
logger.warning(
"No agents provided, skipping document distribution"
)
return
if not docs and not doc_folder:
logger.warning(
"No documents or folder provided, skipping document distribution"
)
return
logger.info("Starting document distribution to agents")
try:
processed_docs = []
# Process individual documents in parallel
if docs:
with ThreadPoolExecutor(
max_workers=max_workers
) as executor:
future_to_doc = {
executor.submit(_process_document, doc): doc
for doc in docs
}
for future in as_completed(future_to_doc):
doc = future_to_doc[future]
try:
processed_docs.append(future.result())
except Exception as e:
logger.error(
f"Failed to process document {doc}: {str(e)}"
)
raise RuntimeError(
f"Document processing failed: {str(e)}"
)
# Process folder if specified
elif doc_folder:
try:
folder_content = doc_master(
folder_path=str(doc_folder), output_type="string"
)
processed_docs.append(folder_content)
except Exception as e:
logger.error(
f"Failed to process folder {doc_folder}: {str(e)}"
)
raise RuntimeError(
f"Folder processing failed: {str(e)}"
)
# Combine and chunk the processed documents
combined_data = "\n".join(processed_docs)
# Update agent prompts in chunks to avoid memory issues
for agent in agents.values():
try:
for i in range(0, len(combined_data), chunk_size):
chunk = combined_data[i : i + chunk_size]
if i == 0:
agent.system_prompt += (
"\nDocuments:\n" + chunk
)
else:
agent.system_prompt += chunk
except Exception as e:
logger.error(
f"Failed to update agent prompt: {str(e)}"
)
raise RuntimeError(
f"Agent prompt update failed: {str(e)}"
)
logger.info(
f"Successfully added documents to {len(agents)} agents"
)
return agents
except Exception as e:
logger.error(f"Document distribution failed: {str(e)}")
raise RuntimeError(f"Document distribution failed: {str(e)}")

@ -1,102 +0,0 @@
from typing import Union, Dict, List, Tuple, Any
def any_to_str(data: Union[str, Dict, List, Tuple, Any]) -> str:
"""Convert any input data type to a nicely formatted string.
This function handles conversion of various Python data types into a clean string representation.
It recursively processes nested data structures and handles None values gracefully.
Args:
data: Input data of any type to convert to string. Can be:
- Dictionary
- List/Tuple
- String
- None
- Any other type that can be converted via str()
Returns:
str: A formatted string representation of the input data.
- Dictionaries are formatted as "key: value" pairs separated by commas
- Lists/tuples are comma-separated
- None returns empty string
- Other types are converted using str()
Examples:
>>> any_to_str({'a': 1, 'b': 2})
'a: 1, b: 2'
>>> any_to_str([1, 2, 3])
'1, 2, 3'
>>> any_to_str(None)
''
"""
try:
if isinstance(data, dict):
# Format dictionary with newlines and indentation
items = []
for k, v in data.items():
value = any_to_str(v)
items.append(f"{k}: {value}")
return "\n".join(items)
elif isinstance(data, (list, tuple)):
# Format sequences with brackets and proper spacing
items = [any_to_str(x) for x in data]
if len(items) == 0:
return "[]" if isinstance(data, list) else "()"
return (
f"[{', '.join(items)}]"
if isinstance(data, list)
else f"({', '.join(items)})"
)
elif data is None:
return "None"
else:
# Handle strings and other types
if isinstance(data, str):
return f'"{data}"'
return str(data)
except Exception as e:
return f"Error converting data: {str(e)}"
def main():
# Example 1: Dictionary
print("Dictionary:")
print(
any_to_str(
{
"name": "John",
"age": 30,
"hobbies": ["reading", "hiking"],
}
)
)
print("\nNested Dictionary:")
print(
any_to_str(
{
"user": {
"id": 123,
"details": {"city": "New York", "active": True},
},
"data": [1, 2, 3],
}
)
)
print("\nList and Tuple:")
print(any_to_str([1, "text", None, (1, 2)]))
print(any_to_str((True, False, None)))
print("\nEmpty Collections:")
print(any_to_str([]))
print(any_to_str({}))
if __name__ == "__main__":
main()
Loading…
Cancel
Save