From 7e3ba13f0fa4c907887fcb006dc60978ea931aaf Mon Sep 17 00:00:00 2001 From: Richard Anthony Hein Date: Mon, 19 Aug 2024 02:22:08 +0000 Subject: [PATCH] useGPU flag & update metadata for processed files --- swarms/server/server.py | 4 +- swarms/server/vector_store.py | 72 +++++++++++++++++------------------ 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/swarms/server/server.py b/swarms/server/server.py index dae0c1d7..7749ba90 100644 --- a/swarms/server/server.py +++ b/swarms/server/server.py @@ -93,11 +93,13 @@ if missing_vars: exit(1) useMetal = os.environ.get("USE_METAL", "False") == "True" +useGPU = os.environ.get("USE_GPU", "False") == "True" print(f"Uploads={uploads}") print(f"MODEL_DIR={model_dir}") print(f"MODEL_NAME={model_name}") print(f"USE_METAL={useMetal}") +print(f"USE_GPU={useGPU}") print(f"OPENAI_API_KEY={openai_api_key}") print(f"OPENAI_API_BASE={openai_api_base}") @@ -145,7 +147,7 @@ if not os.path.exists(uploads): os.makedirs(uploads) # Initialize the vector store -vector_store = VectorStorage(directory=uploads) +vector_store = VectorStorage(directory=uploads, useGPU=useGPU) async def create_chain( diff --git a/swarms/server/vector_store.py b/swarms/server/vector_store.py index 8116043c..d6fa550c 100644 --- a/swarms/server/vector_store.py +++ b/swarms/server/vector_store.py @@ -16,11 +16,11 @@ from swarms.server.async_parent_document_retriever import AsyncParentDocumentRet store_type = "local" # "redis" or "local" class VectorStorage: - def __init__(self, directory): + def __init__(self, directory, useGPU=False): self.embeddings = HuggingFaceBgeEmbeddings( cache_folder="./.embeddings", model_name="BAAI/bge-large-en", - model_kwargs={"device": "cuda"}, # Use GPU + model_kwargs={"device": "cuda" if useGPU else "cpu"}, encode_kwargs={"normalize_embeddings": True}, query_instruction="Represent this sentence for searching relevant passages: ", ) @@ -119,18 +119,50 @@ class VectorStorage: for file in chunk_files: loader = UnstructuredMarkdownLoader( file, - mode="elements", + mode="single", strategy="fast" ) print(f"Loaded {file} in {subdir} ...") documents.extend(loader.load()) - + # Record the file as processed in metadata metadata["processed_files"].append({ "file": file, "processed_at": str(datetime.now()) }) + print(f"Creating new collection for {self.directory}...") + # Create or get the collection + collection = self.client.create_collection( + name=self.directory, + get_or_create=True, + metadata={"processDate": metadata["processDate"]}, + ) + + # Reload vectorstore based on collection + vectorstore = self.getVectorStore(collection_name=collection.name) + + # Create a new parent document retriever + retriever = AsyncParentDocumentRetriever( + docstore=self.store, + vectorstore=vectorstore, + child_splitter=self.child_splitter, + parent_splitter=self.parent_splitter, + ) + + # Add documents to the collection and docstore + print(f"Adding {len(documents)} documents to collection...") + add_docs_start_time = datetime.now() + await retriever.aadd_documents( + documents=documents, add_to_docstore=True + ) + add_docs_end_time = datetime.now() + print( + f"Adding {len(documents)} documents to collection took: {add_docs_end_time - add_docs_start_time}" + ) + + documents = [] # clear documents list for next chunk + # Save metadata to the metadata.json file with open(metadata_file, "w") as metadataFile: json.dump(metadata, metadataFile, indent=4) @@ -141,38 +173,6 @@ class VectorStorage: f"{max_files} markdown file chunks processing time: {chunksEndTime - chunksStartTime}" ) - print(f"Creating new collection for {self.directory}...") - # Create or get the collection - collection = self.client.create_collection( - name=self.directory, - get_or_create=True, - metadata=metadata, - ) - - # Reload vectorstore based on collection - vectorstore = self.getVectorStore(collection_name=collection.name) - - # Create a new parent document retriever - retriever = AsyncParentDocumentRetriever( - docstore=self.store, - vectorstore=vectorstore, - child_splitter=self.child_splitter, - parent_splitter=self.parent_splitter, - ) - - # Add documents to the collection and docstore - print(f"Adding {len(documents)} documents to collection...") - add_docs_start_time = datetime.now() - await retriever.aadd_documents( - documents=documents, add_to_docstore=True - ) - add_docs_end_time = datetime.now() - print( - f"Adding {len(documents)} documents to collection took: {add_docs_end_time - add_docs_start_time}" - ) - - documents = [] # clear documents list for next chunk - subdir_end_time = datetime.now() print(f"Subdir {subdir} processing end time: {subdir_end_time}") print(f"Time taken: {subdir_end_time - subdir_start_time}")