From 4fae6839ebb6b503cab6c1cdd158d0a89b848573 Mon Sep 17 00:00:00 2001 From: Richard Anthony Hein Date: Mon, 19 Aug 2024 17:30:17 +0000 Subject: [PATCH] Temp skip loading docs --- .gitignore | 1 + swarms/server/vector_store.py | 192 +++++++++++++++++----------------- 2 files changed, 97 insertions(+), 96 deletions(-) diff --git a/.gitignore b/.gitignore index 93af3caf..ed319a78 100644 --- a/.gitignore +++ b/.gitignore @@ -225,3 +225,4 @@ swarms/server/.chroma_db .chroma_db .embeddings .parent_documents +docs/metadata.json diff --git a/swarms/server/vector_store.py b/swarms/server/vector_store.py index 04f2e6fe..16f853f5 100644 --- a/swarms/server/vector_store.py +++ b/swarms/server/vector_store.py @@ -84,102 +84,102 @@ class VectorStorage: # Ensure only one process/thread is executing this method at a time lock = asyncio.Lock() async with lock: - subdir_start_time = datetime.now() - print(f"Start {subdir} processing time: {subdir_start_time}") - - # get all existing collections - collections = self.client.list_collections() - print(f"Existing collections: {collections}") - - # Initialize an empty list to hold the documents - documents = [] - # Define the maximum number of files to load at a time - max_files = 1000 - - # Load existing metadata - metadata_file = f"{self.directoryOrUrl}/metadata.json" - metadata = {"processDate": str(datetime.now()), "processed_files": []} - processed_files = set() # Track processed files - if os.path.isfile(metadata_file): - with open(metadata_file, "r") as metadataFile: - metadata = dict[str, str](json.load(metadataFile)) - processed_files = {entry["file"] for entry in metadata.get("processed_files", [])} - - # Get a list of all files in the directory and exclude processed files - all_files = [ - file for file in glob.glob(f"{self.directoryOrUrl}/**/*.md", recursive=True) - if file not in processed_files - ] - - print(f"Loading {len(all_files)} documents for title version {subdir}.") - # Load files in chunks of max_files - for i in range(0, len(all_files), max_files): - chunksStartTime = datetime.now() - chunk_files = all_files[i : i + max_files] - for file in chunk_files: - loader = UnstructuredMarkdownLoader( - file, - mode="single", - strategy="fast" - ) - print(f"Loaded {file} in {subdir} ...") - documents.extend(loader.load()) - - # Record the file as processed in metadata - metadata["processed_files"].append({ - "file": file, - "processed_at": str(datetime.now()) - }) - - print(f"Creating new collection for {self.directoryOrUrl}...") - # Create or get the collection - collection = self.client.create_collection( - name=self.directoryOrUrl, - get_or_create=True, - metadata={"processDate": metadata["processDate"]}, - ) - - # Reload vectorstore based on collection - vectorstore = self.getVectorStore(collection_name=self.directoryOrUrl) - - # Create a new parent document retriever - retriever = AsyncParentDocumentRetriever( - docstore=self.store, - vectorstore=vectorstore, - child_splitter=self.child_splitter, - parent_splitter=self.parent_splitter, - ) - - # force reload of collection to make sure we don't have the default langchain collection - collection = self.client.get_collection(name=self.directoryOrUrl) - vectorstore = self.getVectorStore(collection_name=self.directoryOrUrl) - - # Add documents to the collection and docstore - print(f"Adding {len(documents)} documents to collection...") - add_docs_start_time = datetime.now() - await retriever.aadd_documents( - documents=documents, add_to_docstore=True - ) - add_docs_end_time = datetime.now() - print( - f"Adding {len(documents)} documents to collection took: {add_docs_end_time - add_docs_start_time}" - ) - - documents = [] # clear documents list for next chunk - - # Save metadata to the metadata.json file - with open(metadata_file, "w") as metadataFile: - json.dump(metadata, metadataFile, indent=4) - - print(f"Loaded {len(documents)} documents for directory '{subdir}'.") - chunksEndTime = datetime.now() - print( - f"{max_files} markdown file chunks processing time: {chunksEndTime - chunksStartTime}" - ) - - subdir_end_time = datetime.now() - print(f"Subdir {subdir} processing end time: {subdir_end_time}") - print(f"Time taken: {subdir_end_time - subdir_start_time}") + # subdir_start_time = datetime.now() + # print(f"Start {subdir} processing time: {subdir_start_time}") + + # # get all existing collections + # collections = self.client.list_collections() + # print(f"Existing collections: {collections}") + + # # Initialize an empty list to hold the documents + # documents = [] + # # Define the maximum number of files to load at a time + # max_files = 1000 + + # # Load existing metadata + # metadata_file = f"{self.directoryOrUrl}/metadata.json" + # metadata = {"processDate": str(datetime.now()), "processed_files": []} + # processed_files = set() # Track processed files + # if os.path.isfile(metadata_file): + # with open(metadata_file, "r") as metadataFile: + # metadata = dict[str, str](json.load(metadataFile)) + # processed_files = {entry["file"] for entry in metadata.get("processed_files", [])} + + # # Get a list of all files in the directory and exclude processed files + # all_files = [ + # file for file in glob.glob(f"{self.directoryOrUrl}/**/*.md", recursive=True) + # if file not in processed_files + # ] + + # print(f"Loading {len(all_files)} documents for title version {subdir}.") + # # Load files in chunks of max_files + # for i in range(0, len(all_files), max_files): + # chunksStartTime = datetime.now() + # chunk_files = all_files[i : i + max_files] + # for file in chunk_files: + # loader = UnstructuredMarkdownLoader( + # file, + # mode="single", + # strategy="fast" + # ) + # print(f"Loaded {file} in {subdir} ...") + # documents.extend(loader.load()) + + # # Record the file as processed in metadata + # metadata["processed_files"].append({ + # "file": file, + # "processed_at": str(datetime.now()) + # }) + + # print(f"Creating new collection for {self.directoryOrUrl}...") + # # Create or get the collection + # collection = self.client.create_collection( + # name=self.directoryOrUrl, + # get_or_create=True, + # metadata={"processDate": metadata["processDate"]}, + # ) + + # # Reload vectorstore based on collection + # vectorstore = self.getVectorStore(collection_name=self.directoryOrUrl) + + # # Create a new parent document retriever + # retriever = AsyncParentDocumentRetriever( + # docstore=self.store, + # vectorstore=vectorstore, + # child_splitter=self.child_splitter, + # parent_splitter=self.parent_splitter, + # ) + + # # force reload of collection to make sure we don't have the default langchain collection + # collection = self.client.get_collection(name=self.directoryOrUrl) + # vectorstore = self.getVectorStore(collection_name=self.directoryOrUrl) + + # # Add documents to the collection and docstore + # print(f"Adding {len(documents)} documents to collection...") + # add_docs_start_time = datetime.now() + # await retriever.aadd_documents( + # documents=documents, add_to_docstore=True + # ) + # add_docs_end_time = datetime.now() + # print( + # f"Adding {len(documents)} documents to collection took: {add_docs_end_time - add_docs_start_time}" + # ) + + # documents = [] # clear documents list for next chunk + + # # Save metadata to the metadata.json file + # with open(metadata_file, "w") as metadataFile: + # json.dump(metadata, metadataFile, indent=4) + + # print(f"Loaded {len(documents)} documents for directory '{subdir}'.") + # chunksEndTime = datetime.now() + # print( + # f"{max_files} markdown file chunks processing time: {chunksEndTime - chunksStartTime}" + # ) + + # subdir_end_time = datetime.now() + # print(f"Subdir {subdir} processing end time: {subdir_end_time}") + # print(f"Time taken: {subdir_end_time - subdir_start_time}") # Reload vectorstore based on collection to pass to parent doc retriever # collection = self.client.get_collection(name=self.directoryOrUrl)