useGPU flag & update metadata for processed files

pull/570/head
Richard Anthony Hein 8 months ago
parent 5c46393ee1
commit 7e3ba13f0f

@ -93,11 +93,13 @@ if missing_vars:
exit(1)
useMetal = os.environ.get("USE_METAL", "False") == "True"
useGPU = os.environ.get("USE_GPU", "False") == "True"
print(f"Uploads={uploads}")
print(f"MODEL_DIR={model_dir}")
print(f"MODEL_NAME={model_name}")
print(f"USE_METAL={useMetal}")
print(f"USE_GPU={useGPU}")
print(f"OPENAI_API_KEY={openai_api_key}")
print(f"OPENAI_API_BASE={openai_api_base}")
@ -145,7 +147,7 @@ if not os.path.exists(uploads):
os.makedirs(uploads)
# Initialize the vector store
vector_store = VectorStorage(directory=uploads)
vector_store = VectorStorage(directory=uploads, useGPU=useGPU)
async def create_chain(

@ -16,11 +16,11 @@ from swarms.server.async_parent_document_retriever import AsyncParentDocumentRet
store_type = "local" # "redis" or "local"
class VectorStorage:
def __init__(self, directory):
def __init__(self, directory, useGPU=False):
self.embeddings = HuggingFaceBgeEmbeddings(
cache_folder="./.embeddings",
model_name="BAAI/bge-large-en",
model_kwargs={"device": "cuda"}, # Use GPU
model_kwargs={"device": "cuda" if useGPU else "cpu"},
encode_kwargs={"normalize_embeddings": True},
query_instruction="Represent this sentence for searching relevant passages: ",
)
@ -119,7 +119,7 @@ class VectorStorage:
for file in chunk_files:
loader = UnstructuredMarkdownLoader(
file,
mode="elements",
mode="single",
strategy="fast"
)
print(f"Loaded {file} in {subdir} ...")
@ -131,6 +131,38 @@ class VectorStorage:
"processed_at": str(datetime.now())
})
print(f"Creating new collection for {self.directory}...")
# Create or get the collection
collection = self.client.create_collection(
name=self.directory,
get_or_create=True,
metadata={"processDate": metadata["processDate"]},
)
# Reload vectorstore based on collection
vectorstore = self.getVectorStore(collection_name=collection.name)
# Create a new parent document retriever
retriever = AsyncParentDocumentRetriever(
docstore=self.store,
vectorstore=vectorstore,
child_splitter=self.child_splitter,
parent_splitter=self.parent_splitter,
)
# Add documents to the collection and docstore
print(f"Adding {len(documents)} documents to collection...")
add_docs_start_time = datetime.now()
await retriever.aadd_documents(
documents=documents, add_to_docstore=True
)
add_docs_end_time = datetime.now()
print(
f"Adding {len(documents)} documents to collection took: {add_docs_end_time - add_docs_start_time}"
)
documents = [] # clear documents list for next chunk
# Save metadata to the metadata.json file
with open(metadata_file, "w") as metadataFile:
json.dump(metadata, metadataFile, indent=4)
@ -141,38 +173,6 @@ class VectorStorage:
f"{max_files} markdown file chunks processing time: {chunksEndTime - chunksStartTime}"
)
print(f"Creating new collection for {self.directory}...")
# Create or get the collection
collection = self.client.create_collection(
name=self.directory,
get_or_create=True,
metadata=metadata,
)
# Reload vectorstore based on collection
vectorstore = self.getVectorStore(collection_name=collection.name)
# Create a new parent document retriever
retriever = AsyncParentDocumentRetriever(
docstore=self.store,
vectorstore=vectorstore,
child_splitter=self.child_splitter,
parent_splitter=self.parent_splitter,
)
# Add documents to the collection and docstore
print(f"Adding {len(documents)} documents to collection...")
add_docs_start_time = datetime.now()
await retriever.aadd_documents(
documents=documents, add_to_docstore=True
)
add_docs_end_time = datetime.now()
print(
f"Adding {len(documents)} documents to collection took: {add_docs_end_time - add_docs_start_time}"
)
documents = [] # clear documents list for next chunk
subdir_end_time = datetime.now()
print(f"Subdir {subdir} processing end time: {subdir_end_time}")
print(f"Time taken: {subdir_end_time - subdir_start_time}")

Loading…
Cancel
Save