Temp skip loading docs

pull/570/head
Richard Anthony Hein 8 months ago
parent 9e2e7342ac
commit 4fae6839eb

1
.gitignore vendored

@ -225,3 +225,4 @@ swarms/server/.chroma_db
.chroma_db .chroma_db
.embeddings .embeddings
.parent_documents .parent_documents
docs/metadata.json

@ -84,102 +84,102 @@ class VectorStorage:
# Ensure only one process/thread is executing this method at a time # Ensure only one process/thread is executing this method at a time
lock = asyncio.Lock() lock = asyncio.Lock()
async with lock: async with lock:
subdir_start_time = datetime.now() # subdir_start_time = datetime.now()
print(f"Start {subdir} processing time: {subdir_start_time}") # print(f"Start {subdir} processing time: {subdir_start_time}")
# get all existing collections # # get all existing collections
collections = self.client.list_collections() # collections = self.client.list_collections()
print(f"Existing collections: {collections}") # print(f"Existing collections: {collections}")
# Initialize an empty list to hold the documents # # Initialize an empty list to hold the documents
documents = [] # documents = []
# Define the maximum number of files to load at a time # # Define the maximum number of files to load at a time
max_files = 1000 # max_files = 1000
# Load existing metadata # # Load existing metadata
metadata_file = f"{self.directoryOrUrl}/metadata.json" # metadata_file = f"{self.directoryOrUrl}/metadata.json"
metadata = {"processDate": str(datetime.now()), "processed_files": []} # metadata = {"processDate": str(datetime.now()), "processed_files": []}
processed_files = set() # Track processed files # processed_files = set() # Track processed files
if os.path.isfile(metadata_file): # if os.path.isfile(metadata_file):
with open(metadata_file, "r") as metadataFile: # with open(metadata_file, "r") as metadataFile:
metadata = dict[str, str](json.load(metadataFile)) # metadata = dict[str, str](json.load(metadataFile))
processed_files = {entry["file"] for entry in metadata.get("processed_files", [])} # processed_files = {entry["file"] for entry in metadata.get("processed_files", [])}
# Get a list of all files in the directory and exclude processed files # # Get a list of all files in the directory and exclude processed files
all_files = [ # all_files = [
file for file in glob.glob(f"{self.directoryOrUrl}/**/*.md", recursive=True) # file for file in glob.glob(f"{self.directoryOrUrl}/**/*.md", recursive=True)
if file not in processed_files # if file not in processed_files
] # ]
print(f"Loading {len(all_files)} documents for title version {subdir}.") # print(f"Loading {len(all_files)} documents for title version {subdir}.")
# Load files in chunks of max_files # # Load files in chunks of max_files
for i in range(0, len(all_files), max_files): # for i in range(0, len(all_files), max_files):
chunksStartTime = datetime.now() # chunksStartTime = datetime.now()
chunk_files = all_files[i : i + max_files] # chunk_files = all_files[i : i + max_files]
for file in chunk_files: # for file in chunk_files:
loader = UnstructuredMarkdownLoader( # loader = UnstructuredMarkdownLoader(
file, # file,
mode="single", # mode="single",
strategy="fast" # strategy="fast"
) # )
print(f"Loaded {file} in {subdir} ...") # print(f"Loaded {file} in {subdir} ...")
documents.extend(loader.load()) # documents.extend(loader.load())
# Record the file as processed in metadata # # Record the file as processed in metadata
metadata["processed_files"].append({ # metadata["processed_files"].append({
"file": file, # "file": file,
"processed_at": str(datetime.now()) # "processed_at": str(datetime.now())
}) # })
print(f"Creating new collection for {self.directoryOrUrl}...") # print(f"Creating new collection for {self.directoryOrUrl}...")
# Create or get the collection # # Create or get the collection
collection = self.client.create_collection( # collection = self.client.create_collection(
name=self.directoryOrUrl, # name=self.directoryOrUrl,
get_or_create=True, # get_or_create=True,
metadata={"processDate": metadata["processDate"]}, # metadata={"processDate": metadata["processDate"]},
) # )
# Reload vectorstore based on collection # # Reload vectorstore based on collection
vectorstore = self.getVectorStore(collection_name=self.directoryOrUrl) # vectorstore = self.getVectorStore(collection_name=self.directoryOrUrl)
# Create a new parent document retriever # # Create a new parent document retriever
retriever = AsyncParentDocumentRetriever( # retriever = AsyncParentDocumentRetriever(
docstore=self.store, # docstore=self.store,
vectorstore=vectorstore, # vectorstore=vectorstore,
child_splitter=self.child_splitter, # child_splitter=self.child_splitter,
parent_splitter=self.parent_splitter, # parent_splitter=self.parent_splitter,
) # )
# force reload of collection to make sure we don't have the default langchain collection # # force reload of collection to make sure we don't have the default langchain collection
collection = self.client.get_collection(name=self.directoryOrUrl) # collection = self.client.get_collection(name=self.directoryOrUrl)
vectorstore = self.getVectorStore(collection_name=self.directoryOrUrl) # vectorstore = self.getVectorStore(collection_name=self.directoryOrUrl)
# Add documents to the collection and docstore # # Add documents to the collection and docstore
print(f"Adding {len(documents)} documents to collection...") # print(f"Adding {len(documents)} documents to collection...")
add_docs_start_time = datetime.now() # add_docs_start_time = datetime.now()
await retriever.aadd_documents( # await retriever.aadd_documents(
documents=documents, add_to_docstore=True # documents=documents, add_to_docstore=True
) # )
add_docs_end_time = datetime.now() # add_docs_end_time = datetime.now()
print( # print(
f"Adding {len(documents)} documents to collection took: {add_docs_end_time - add_docs_start_time}" # f"Adding {len(documents)} documents to collection took: {add_docs_end_time - add_docs_start_time}"
) # )
documents = [] # clear documents list for next chunk # documents = [] # clear documents list for next chunk
# Save metadata to the metadata.json file # # Save metadata to the metadata.json file
with open(metadata_file, "w") as metadataFile: # with open(metadata_file, "w") as metadataFile:
json.dump(metadata, metadataFile, indent=4) # json.dump(metadata, metadataFile, indent=4)
print(f"Loaded {len(documents)} documents for directory '{subdir}'.") # print(f"Loaded {len(documents)} documents for directory '{subdir}'.")
chunksEndTime = datetime.now() # chunksEndTime = datetime.now()
print( # print(
f"{max_files} markdown file chunks processing time: {chunksEndTime - chunksStartTime}" # f"{max_files} markdown file chunks processing time: {chunksEndTime - chunksStartTime}"
) # )
subdir_end_time = datetime.now() # subdir_end_time = datetime.now()
print(f"Subdir {subdir} processing end time: {subdir_end_time}") # print(f"Subdir {subdir} processing end time: {subdir_end_time}")
print(f"Time taken: {subdir_end_time - subdir_start_time}") # print(f"Time taken: {subdir_end_time - subdir_start_time}")
# Reload vectorstore based on collection to pass to parent doc retriever # Reload vectorstore based on collection to pass to parent doc retriever
# collection = self.client.get_collection(name=self.directoryOrUrl) # collection = self.client.get_collection(name=self.directoryOrUrl)

Loading…
Cancel
Save