From f2912babc512544274417a7d9d220120f7cda405 Mon Sep 17 00:00:00 2001 From: Kye Date: Fri, 16 Feb 2024 12:10:20 -0800 Subject: [PATCH] [FEATS] [ DictInternalMemory DictSharedMemory LangchainChromaVectorMemory synchronized_queue TaskQueueBase] --- Cargo.lock | 323 +++++++++++++++++++++++++ Cargo.toml | 5 +- pyproject.toml | 6 +- runtime/concurrent_exec.rs | 24 +- swarms/memory/__init__.py | 6 + swarms/memory/dict_internal_memory.py | 86 +++++++ swarms/memory/dict_shared_memory.py | 98 ++++++++ swarms/memory/lanchain_chroma.py | 194 +++++++++++++++ swarms/structs/__init__.py | 1 - swarms/structs/multi_agent_rag.py | 85 ------- swarms/structs/stackoverflow_swarm.py | 12 +- swarms/structs/task_queue_base.py | 81 +++++++ swarms/structs/test_majority_voting.py | 151 ++++++++++++ tests/structs/test_majority_voting.py | 151 ++++++++++++ 14 files changed, 1111 insertions(+), 112 deletions(-) create mode 100644 Cargo.lock create mode 100644 swarms/memory/dict_internal_memory.py create mode 100644 swarms/memory/dict_shared_memory.py create mode 100644 swarms/memory/lanchain_chroma.py delete mode 100644 swarms/structs/multi_agent_rag.py create mode 100644 swarms/structs/task_queue_base.py create mode 100644 swarms/structs/test_majority_voting.py create mode 100644 tests/structs/test_majority_voting.py diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 00000000..1b078be7 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,323 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" + +[[package]] +name = "either" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" + +[[package]] +name = "engine" +version = "0.1.0" +dependencies = [ + "log", + "pyo3", + "rayon", +] + +[[package]] +name = "indoc" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47741a8bc60fb26eb8d6e0238bbb26d8575ff623fdc97b1a2c00c050b9684ed8" +dependencies = [ + "indoc-impl", + "proc-macro-hack", +] + +[[package]] +name = "indoc-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce046d161f000fffde5f432a0d034d0341dc152643b2598ed5bfce44c4f3a8f0" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", + "unindent", +] + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "libc" +version = "0.2.153" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" + +[[package]] +name = "lock_api" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + +[[package]] +name = "paste" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45ca20c77d80be666aef2b45486da86238fabe33e38306bd3118fe4af33fa880" +dependencies = [ + "paste-impl", + "proc-macro-hack", +] + +[[package]] +name = "paste-impl" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d95a7db200b97ef370c8e6de0088252f7e0dfff7d047a28528e47456c0fc98b6" +dependencies = [ + "proc-macro-hack", +] + +[[package]] +name = "proc-macro-hack" +version = "0.5.20+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" + +[[package]] +name = "proc-macro2" +version = "1.0.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "pyo3" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d41d50a7271e08c7c8a54cd24af5d62f73ee3a6f6a314215281ebdec421d5752" +dependencies = [ + "cfg-if", + "indoc", + "libc", + "parking_lot", + "paste", + "pyo3-build-config", + "pyo3-macros", + "unindent", +] + +[[package]] +name = "pyo3-build-config" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "779239fc40b8e18bc8416d3a37d280ca9b9fb04bda54b98037bb6748595c2410" +dependencies = [ + "once_cell", +] + +[[package]] +name = "pyo3-macros" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b247e8c664be87998d8628e86f282c25066165f1f8dda66100c48202fdb93a" +dependencies = [ + "pyo3-macros-backend", + "quote", + "syn", +] + +[[package]] +name = "pyo3-macros-backend" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a8c2812c412e00e641d99eeb79dd478317d981d938aa60325dfa7157b607095" +dependencies = [ + "proc-macro2", + "pyo3-build-config", + "quote", + "syn", +] + +[[package]] +name = "quote" +version = "1.0.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rayon" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7237101a77a10773db45d62004a272517633fbcc3df19d96455ede1122e051" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "smallvec" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "unindent" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1766d682d402817b5ac4490b3c3002d91dfa0d22812f341609f97b08757359c" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml index a4fd0c06..142a8652 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,13 +5,10 @@ edition = "2018" [lib] name = "engine" -path = "src/my_lib.rs" +path = "runtime/concurrent_exec.rs" crate-type = ["cdylib"] [dependencies] pyo3 = { version = "0.15", features = ["extension-module"] } rayon = "1.5.1" log = "0.4.14" -rustcuda = "0.1.0" -rustcuda_derive = "*" -rustcuda_core = "0.1" diff --git a/pyproject.toml b/pyproject.toml index 99b6a226..7d95de7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,10 +1,12 @@ [build-system] -requires = ["poetry-core>=1.0.0"] +requires = ["poetry-core>=1.0.0", "maturin"] build-backend = "poetry.core.masonry.api" + + [tool.poetry] name = "swarms" -version = "4.1.4" +version = "4.1.5" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] diff --git a/runtime/concurrent_exec.rs b/runtime/concurrent_exec.rs index 2d3bb504..e1f23533 100644 --- a/runtime/concurrent_exec.rs +++ b/runtime/concurrent_exec.rs @@ -1,7 +1,7 @@ use pyo3::prelude::*; use pyo3::wrap_pyfunction; use pyo3::types::IntoPyDict; -use rayon::{ThreadPool, ThreadPoolBuilder, prelude::*}; +use rayon::{ThreadPool, ThreadPoolBuilder}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use std::thread; @@ -13,16 +13,6 @@ fn rust_module(py: Python, m: &PyModule) -> PyResult<()> { Ok(()) } -#[pyfunction] -pub fn concurrent_exec( - py_codes: Vec<&str>, - timeout: Option, - num_threads: usize, - error_handler: F, - log_function: G, - result_handler: H, -) -> PyResult>> - /// This function wraps Python code in Rust concurrency for ultra high performance. /// /// # Arguments @@ -45,6 +35,16 @@ pub fn concurrent_exec( /// let result_handler = |r| println!("Result: {:?}", r); /// execute_python_codes(py_codes, timeout, num_threads, error_handler, log_function, result_handler); /// ``` + +#[pyfunction] +pub fn concurrent_exec( + py_codes: Vec<&str>, + timeout: Option, + num_threads: usize, + error_handler: F, + log_function: G, + result_handler: H, +) -> PyResult>> where F: Fn(&str), G: Fn(&str), @@ -83,7 +83,7 @@ where None => {} } - results.lock().unwrap().push(result.clone()); + results.lock().unwrap().push(result.clone(result)); result_handler(&result); }); }); diff --git a/swarms/memory/__init__.py b/swarms/memory/__init__.py index 2dca8172..89c2fddf 100644 --- a/swarms/memory/__init__.py +++ b/swarms/memory/__init__.py @@ -6,6 +6,9 @@ from swarms.memory.weaviate_db import WeaviateDB from swarms.memory.visual_memory import VisualShortTermMemory from swarms.memory.action_subtask import ActionSubtaskEntry from swarms.memory.chroma_db import ChromaDB +from swarms.memory.dict_internal_memory import DictInternalMemory +from swarms.memory.dict_shared_memory import DictSharedMemory +from swarms.memory.lanchain_chroma import LangchainChromaVectorMemory __all__ = [ "AbstractVectorDatabase", @@ -16,4 +19,7 @@ __all__ = [ "VisualShortTermMemory", "ActionSubtaskEntry", "ChromaDB", + "DictInternalMemory", + "DictSharedMemory", + "LangchainChromaVectorMemory", ] diff --git a/swarms/memory/dict_internal_memory.py b/swarms/memory/dict_internal_memory.py new file mode 100644 index 00000000..daba0b0d --- /dev/null +++ b/swarms/memory/dict_internal_memory.py @@ -0,0 +1,86 @@ +import uuid +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Tuple + + +class InternalMemoryBase(ABC): + """Abstract base class for internal memory of agents in the swarm.""" + + def __init__(self, n_entries): + """Initialize the internal memory. In the current architecture the memory always consists of a set of soltuions or evaluations. + During the operation, the agent should retrivie best solutions from it's internal memory based on the score. + + Moreover, the project is designed around LLMs for the proof of concepts, so we treat all entry content as a string. + """ + self.n_entries = n_entries + + @abstractmethod + def add(self, score, entry): + """Add an entry to the internal memory.""" + raise NotImplementedError + + @abstractmethod + def get_top_n(self, n): + """Get the top n entries from the internal memory.""" + raise NotImplementedError + + +class DictInternalMemory(InternalMemoryBase): + def __init__(self, n_entries: int): + """ + Initialize the internal memory. In the current architecture the memory always consists of a set of solutions or evaluations. + Simple key-value store for now. + + Args: + n_entries (int): The maximum number of entries to keep in the internal memory. + """ + super().__init__(n_entries) + self.data: Dict[str, Dict[str, Any]] = {} + + def add(self, score: float, content: Any) -> None: + """ + Add an entry to the internal memory. + + Args: + score (float): The score or fitness value associated with the entry. + content (Any): The content of the entry. + + Returns: + None + """ + random_key: str = str(uuid.uuid4()) + self.data[random_key] = {"score": score, "content": content} + + # keep only the best n entries + sorted_data: List[Tuple[str, Dict[str, Any]]] = sorted( + self.data.items(), + key=lambda x: x[1]["score"], + reverse=True, + ) + self.data = dict(sorted_data[: self.n_entries]) + + def get_top_n(self, n: int) -> List[Tuple[str, Dict[str, Any]]]: + """ + Get the top n entries from the internal memory. + + Args: + n (int): The number of top entries to retrieve. + + Returns: + List[Tuple[str, Dict[str, Any]]]: A list of tuples containing the random keys and corresponding entry data. + """ + sorted_data: List[Tuple[str, Dict[str, Any]]] = sorted( + self.data.items(), + key=lambda x: x[1]["score"], + reverse=True, + ) + return sorted_data[:n] + + def len(self) -> int: + """ + Get the number of entries in the internal memory. + + Returns: + int: The number of entries in the internal memory. + """ + return len(self.data) diff --git a/swarms/memory/dict_shared_memory.py b/swarms/memory/dict_shared_memory.py new file mode 100644 index 00000000..a8b78be7 --- /dev/null +++ b/swarms/memory/dict_shared_memory.py @@ -0,0 +1,98 @@ +import datetime +import json +import os +import threading +import uuid +from pathlib import Path +from typing import Dict, Any + + +class DictSharedMemory: + """A class representing a shared memory that stores entries as a dictionary. + + Attributes: + file_loc (Path): The file location where the memory is stored. + lock (threading.Lock): A lock used for thread synchronization. + + Methods: + __init__(self, file_loc: str = None) -> None: Initializes the shared memory. + add_entry(self, score: float, agent_id: str, agent_cycle: int, entry: Any) -> bool: Adds an entry to the internal memory. + get_top_n(self, n: int) -> None: Gets the top n entries from the internal memory. + write_to_file(self, data: Dict[str, Dict[str, Any]]) -> bool: Writes the internal memory to a file. + """ + + def __init__(self, file_loc: str = None) -> None: + """Initialize the shared memory. In the current architecture the memory always consists of a set of soltuions or evaluations. + Moreover, the project is designed around LLMs for the proof of concepts, so we treat all entry content as a string. + """ + if file_loc is not None: + self.file_loc = Path(file_loc) + if not self.file_loc.exists(): + self.file_loc.touch() + + self.lock = threading.Lock() + + def add( + self, + score: float, + agent_id: str, + agent_cycle: int, + entry: Any, + ) -> bool: + """Add an entry to the internal memory.""" + with self.lock: + entry_id = str(uuid.uuid4()) + data = {} + epoch = datetime.datetime.utcfromtimestamp(0) + epoch = ( + datetime.datetime.utcnow() - epoch + ).total_seconds() + data[entry_id] = { + "agent": agent_id, + "epoch": epoch, + "score": score, + "cycle": agent_cycle, + "content": entry, + } + status = self.write_to_file(data) + self.plot_performance() + return status + + def get_top_n(self, n: int) -> None: + """Get the top n entries from the internal memory.""" + with self.lock: + with open(self.file_loc, "r") as f: + try: + file_data = json.load(f) + except Exception as e: + file_data = {} + raise e + + sorted_data = dict( + sorted( + file_data.items(), + key=lambda item: item[1]["score"], + reverse=True, + ) + ) + top_n = dict(list(sorted_data.items())[:n]) + return top_n + + def write_to_file(self, data: Dict[str, Dict[str, Any]]) -> bool: + """Write the internal memory to a file.""" + if self.file_loc is not None: + with open(self.file_loc, "r") as f: + try: + file_data = json.load(f) + except Exception as e: + file_data = {} + raise e + + file_data = file_data | data + with open(self.file_loc, "w") as f: + json.dump(file_data, f, indent=4) + + f.flush() + os.fsync(f.fileno()) + + return True diff --git a/swarms/memory/lanchain_chroma.py b/swarms/memory/lanchain_chroma.py new file mode 100644 index 00000000..07846fd6 --- /dev/null +++ b/swarms/memory/lanchain_chroma.py @@ -0,0 +1,194 @@ +import threading +from pathlib import Path + +from langchain.chains import RetrievalQA +from langchain.chains.question_answering import load_qa_chain +from swarms.models.openai_models import OpenAIChat +from langchain.embeddings.openai import OpenAIEmbeddings +from langchain.text_splitter import CharacterTextSplitter +from langchain.vectorstores import Chroma + + +def synchronized_mem(method): + """ + Decorator that synchronizes access to a method using a lock. + + Args: + method: The method to be decorated. + + Returns: + The decorated method. + """ + + def wrapper(self, *args, **kwargs): + with self.lock: + try: + return method(self, *args, **kwargs) + except Exception as e: + print(f"Failed to execute {method.__name__}: {e}") + + return wrapper + + +class LangchainChromaVectorMemory: + """ + A class representing a vector memory for storing and retrieving text entries. + + Attributes: + loc (str): The location of the vector memory. + chunk_size (int): The size of each text chunk. + chunk_overlap_frac (float): The fraction of overlap between text chunks. + embeddings (OpenAIEmbeddings): The embeddings used for text representation. + count (int): The current count of text entries in the vector memory. + lock (threading.Lock): A lock for thread safety. + db (Chroma): The Chroma database for storing text entries. + qa (RetrievalQA): The retrieval QA system for answering questions. + + Methods: + __init__: Initializes the VectorMemory object. + _init_db: Initializes the Chroma database. + _init_retriever: Initializes the retrieval QA system. + add_entry: Adds an entry to the vector memory. + search_memory: Searches the vector memory for similar entries. + ask_question: Asks a question to the vector memory. + """ + def __init__( + self, + loc=None, + chunk_size: int = 1000, + chunk_overlap_frac: float = 0.1, + *args, + **kwargs, + ): + """ + Initializes the VectorMemory object. + + Args: + loc (str): The location of the vector memory. If None, defaults to "./tmp/vector_memory". + chunk_size (int): The size of each text chunk. + chunk_overlap_frac (float): The fraction of overlap between text chunks. + """ + if loc is None: + loc = "./tmp/vector_memory" + self.loc = Path(loc) + self.chunk_size = chunk_size + self.chunk_overlap = chunk_size * chunk_overlap_frac + self.embeddings = OpenAIEmbeddings() + self.count = 0 + self.lock = threading.Lock() + + self.db = self._init_db() + self.qa = self._init_retriever() + + def _init_db(self): + """ + Initializes the Chroma database. + + Returns: + Chroma: The initialized Chroma database. + """ + texts = [ + "init" + ] # TODO find how to initialize Chroma without any text + chroma_db = Chroma.from_texts( + texts=texts, + embedding=self.embeddings, + persist_directory=str(self.loc), + ) + self.count = chroma_db._collection.count() + return chroma_db + + def _init_retriever(self): + """ + Initializes the retrieval QA system. + + Returns: + RetrievalQA: The initialized retrieval QA system. + """ + model = OpenAIChat( + model_name="gpt-3.5-turbo", + ) + qa_chain = load_qa_chain(model, chain_type="stuff") + retriever = self.db.as_retriever( + search_type="mmr", search_kwargs={"k": 10} + ) + qa = RetrievalQA( + combine_documents_chain=qa_chain, retriever=retriever + ) + return qa + + @synchronized_mem + def add(self, entry: str): + """ + Add an entry to the internal memory. + + Args: + entry (str): The entry to be added. + + Returns: + bool: True if the entry was successfully added, False otherwise. + """ + text_splitter = CharacterTextSplitter( + chunk_size=self.chunk_size, + chunk_overlap=self.chunk_overlap, + separator=" ", + ) + texts = text_splitter.split_text(entry) + + self.db.add_texts(texts) + self.count += self.db._collection.count() + self.db.persist() + return True + + @synchronized_mem + def search_memory( + self, query: str, k=10, type="mmr", distance_threshold=0.5 + ): + """ + Searching the vector memory for similar entries. + + Args: + query (str): The query to search for. + k (int): The number of results to return. + type (str): The type of search to perform: "cos" or "mmr". + distance_threshold (float): The similarity threshold to use for the search. Results with distance > similarity_threshold will be dropped. + + Returns: + list[str]: A list of the top k results. + """ + self.count = self.db._collection.count() + if k > self.count: + k = self.count - 1 + if k <= 0: + return None + + if type == "mmr": + texts = self.db.max_marginal_relevance_search( + query=query, k=k, fetch_k=min(20, self.count) + ) + texts = [text.page_content for text in texts] + elif type == "cos": + texts = self.db.similarity_search_with_score( + query=query, k=k + ) + texts = [ + text[0].page_content + for text in texts + if text[-1] < distance_threshold + ] + + return texts + + @synchronized_mem + def query(self, question: str): + """ + Ask a question to the vector memory. + + Args: + question (str): The question to ask. + + Returns: + str: The answer to the question. + """ + answer = self.qa.run(question) + return answer diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index c8da387a..b8dfc929 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -17,7 +17,6 @@ from swarms.structs.recursive_workflow import RecursiveWorkflow from swarms.structs.schemas import ( Artifact, ArtifactUpload, - Step, StepInput, StepOutput, StepRequestBody, diff --git a/swarms/structs/multi_agent_rag.py b/swarms/structs/multi_agent_rag.py deleted file mode 100644 index 91d8c39d..00000000 --- a/swarms/structs/multi_agent_rag.py +++ /dev/null @@ -1,85 +0,0 @@ -from dataclasses import dataclass -from typing import List, Optional - -from swarms.memory.base_vectordatabase import AbstractVectorDatabase -from swarms.structs.agent import Agent - - -@dataclass -class MultiAgentRag: - """ - Represents a multi-agent RAG (Relational Agent Graph) structure. - - Attributes: - agents (List[Agent]): List of agents in the multi-agent RAG. - db (AbstractVectorDatabase): Database used for querying. - verbose (bool): Flag indicating whether to print verbose output. - """ - - agents: List[Agent] - db: AbstractVectorDatabase - verbose: bool = False - - def query_database(self, query: str): - """ - Queries the database using the given query string. - - Args: - query (str): The query string. - - Returns: - List: The list of results from the database. - """ - results = [] - for agent in self.agents: - agent_results = agent.long_term_memory_prompt(query) - results.extend(agent_results) - return results - - def get_agent_by_id(self, agent_id) -> Optional[Agent]: - """ - Retrieves an agent from the multi-agent RAG by its ID. - - Args: - agent_id: The ID of the agent to retrieve. - - Returns: - Agent or None: The agent with the specified ID, or None if not found. - """ - for agent in self.agents: - if agent.agent_id == agent_id: - return agent - return None - - def add_message( - self, sender: Agent, message: str, *args, **kwargs - ): - """ - Adds a message to the database. - - Args: - sender (Agent): The agent sending the message. - message (str): The message to add. - *args: Additional positional arguments. - **kwargs: Additional keyword arguments. - - Returns: - int: The ID of the added message. - """ - doc = f"{sender.ai_name}: {message}" - - return self.db.add(doc) - - def query(self, message: str, *args, **kwargs): - """ - Queries the database using the given message. - - Args: - message (str): The message to query. - *args: Additional positional arguments. - **kwargs: Additional keyword arguments. - - Returns: - List: The list of results from the database. - """ - return self.db.query(message) diff --git a/swarms/structs/stackoverflow_swarm.py b/swarms/structs/stackoverflow_swarm.py index b003c9b7..a1a1c28a 100644 --- a/swarms/structs/stackoverflow_swarm.py +++ b/swarms/structs/stackoverflow_swarm.py @@ -52,14 +52,12 @@ class StackOverflowSwarm(BaseMultiAgentStructure): # Counter for the number of upvotes per post self.upvotes = 0 - + # Counter for the number of downvotes per post self.downvotes = 0 - + # Forum for the agents to interact self.forum = [] - - def run(self, task: str, *args, **kwargs): """ @@ -85,9 +83,7 @@ class StackOverflowSwarm(BaseMultiAgentStructure): **kwargs, ) # Add to the conversation - self.conversation.add( - agent.ai_name, f"{response}" - ) + self.conversation.add(agent.ai_name, f"{response}") logger.info(f"[{agent.ai_name}]: [{response}]") - + return self.conversation.return_history_as_string() diff --git a/swarms/structs/task_queue_base.py b/swarms/structs/task_queue_base.py new file mode 100644 index 00000000..ee2eb80a --- /dev/null +++ b/swarms/structs/task_queue_base.py @@ -0,0 +1,81 @@ +import threading +from abc import ABC, abstractmethod +from swarms.structs.agent import Agent +from swarms.structs.task import Task + + +def synchronized_queue(method): + """ + Decorator that synchronizes access to the decorated method using a lock. + The lock is acquired before executing the method and released afterwards. + + Args: + method: The method to be decorated. + + Returns: + The decorated method. + """ + timeout_sec = 5 + + def wrapper(self, *args, **kwargs): + with self.lock: + self.lock.acquire(timeout=timeout_sec) + try: + return method(self, *args, **kwargs) + except Exception as e: + print(f"Failed to execute {method.__name__}: {e}") + finally: + self.lock.release() + + return wrapper + + +class TaskQueueBase(ABC): + def __init__(self): + self.lock = threading.Lock() + + @synchronized_queue + @abstractmethod + def add_task(self, task: Task) -> bool: + """Adds a task to the queue. + + Args: + task (Task): The task to be added to the queue. + + Returns: + bool: True if the task was successfully added, False otherwise. + """ + raise NotImplementedError + + @synchronized_queue + @abstractmethod + def get_task(self, agent: Agent) -> Task: + """Gets the next task from the queue. + + Args: + agent (Agent): The agent requesting the task. + + Returns: + Task: The next task from the queue. + """ + raise NotImplementedError + + @synchronized_queue + @abstractmethod + def complete_task(self, task_id: str): + """Sets the task as completed. + + Args: + task_id (str): The ID of the task to be marked as completed. + """ + raise NotImplementedError + + @synchronized_queue + @abstractmethod + def reset_task(self, task_id: str): + """Resets the task if the agent failed to complete it. + + Args: + task_id (str): The ID of the task to be reset. + """ + raise NotImplementedError diff --git a/swarms/structs/test_majority_voting.py b/swarms/structs/test_majority_voting.py new file mode 100644 index 00000000..45474a2d --- /dev/null +++ b/swarms/structs/test_majority_voting.py @@ -0,0 +1,151 @@ +from unittest.mock import MagicMock +import pytest + +from swarms.structs.agent import Agent +from swarms.structs.majority_voting import MajorityVoting + + +def test_majority_voting_run_concurrent(mocker): + # Create mock agents + agent1 = MagicMock(spec=Agent) + agent2 = MagicMock(spec=Agent) + agent3 = MagicMock(spec=Agent) + + # Create mock majority voting + mv = MajorityVoting( + agents=[agent1, agent2, agent3], + concurrent=True, + multithreaded=False, + ) + + # Create mock conversation + conversation = MagicMock() + mv.conversation = conversation + + # Create mock results + results = ["Paris", "Paris", "Lyon"] + + # Mock agent.run method + agent1.run.return_value = results[0] + agent2.run.return_value = results[1] + agent3.run.return_value = results[2] + + # Run majority voting + majority_vote = mv.run("What is the capital of France?") + + # Assert agent.run method was called with the correct task + agent1.run.assert_called_once_with( + "What is the capital of France?" + ) + agent2.run.assert_called_once_with( + "What is the capital of France?" + ) + agent3.run.assert_called_once_with( + "What is the capital of France?" + ) + + # Assert conversation.add method was called with the correct responses + conversation.add.assert_any_call(agent1.agent_name, results[0]) + conversation.add.assert_any_call(agent2.agent_name, results[1]) + conversation.add.assert_any_call(agent3.agent_name, results[2]) + + # Assert majority vote is correct + assert majority_vote is not None + + +def test_majority_voting_run_multithreaded(mocker): + # Create mock agents + agent1 = MagicMock(spec=Agent) + agent2 = MagicMock(spec=Agent) + agent3 = MagicMock(spec=Agent) + + # Create mock majority voting + mv = MajorityVoting( + agents=[agent1, agent2, agent3], + concurrent=False, + multithreaded=True, + ) + + # Create mock conversation + conversation = MagicMock() + mv.conversation = conversation + + # Create mock results + results = ["Paris", "Paris", "Lyon"] + + # Mock agent.run method + agent1.run.return_value = results[0] + agent2.run.return_value = results[1] + agent3.run.return_value = results[2] + + # Run majority voting + majority_vote = mv.run("What is the capital of France?") + + # Assert agent.run method was called with the correct task + agent1.run.assert_called_once_with( + "What is the capital of France?" + ) + agent2.run.assert_called_once_with( + "What is the capital of France?" + ) + agent3.run.assert_called_once_with( + "What is the capital of France?" + ) + + # Assert conversation.add method was called with the correct responses + conversation.add.assert_any_call(agent1.agent_name, results[0]) + conversation.add.assert_any_call(agent2.agent_name, results[1]) + conversation.add.assert_any_call(agent3.agent_name, results[2]) + + # Assert majority vote is correct + assert majority_vote is not None + + +@pytest.mark.asyncio +async def test_majority_voting_run_asynchronous(mocker): + # Create mock agents + agent1 = MagicMock(spec=Agent) + agent2 = MagicMock(spec=Agent) + agent3 = MagicMock(spec=Agent) + + # Create mock majority voting + mv = MajorityVoting( + agents=[agent1, agent2, agent3], + concurrent=False, + multithreaded=False, + asynchronous=True, + ) + + # Create mock conversation + conversation = MagicMock() + mv.conversation = conversation + + # Create mock results + results = ["Paris", "Paris", "Lyon"] + + # Mock agent.run method + agent1.run.return_value = results[0] + agent2.run.return_value = results[1] + agent3.run.return_value = results[2] + + # Run majority voting + majority_vote = await mv.run("What is the capital of France?") + + # Assert agent.run method was called with the correct task + agent1.run.assert_called_once_with( + "What is the capital of France?" + ) + agent2.run.assert_called_once_with( + "What is the capital of France?" + ) + agent3.run.assert_called_once_with( + "What is the capital of France?" + ) + + # Assert conversation.add method was called with the correct responses + conversation.add.assert_any_call(agent1.agent_name, results[0]) + conversation.add.assert_any_call(agent2.agent_name, results[1]) + conversation.add.assert_any_call(agent3.agent_name, results[2]) + + # Assert majority vote is correct + assert majority_vote is not None diff --git a/tests/structs/test_majority_voting.py b/tests/structs/test_majority_voting.py new file mode 100644 index 00000000..45474a2d --- /dev/null +++ b/tests/structs/test_majority_voting.py @@ -0,0 +1,151 @@ +from unittest.mock import MagicMock +import pytest + +from swarms.structs.agent import Agent +from swarms.structs.majority_voting import MajorityVoting + + +def test_majority_voting_run_concurrent(mocker): + # Create mock agents + agent1 = MagicMock(spec=Agent) + agent2 = MagicMock(spec=Agent) + agent3 = MagicMock(spec=Agent) + + # Create mock majority voting + mv = MajorityVoting( + agents=[agent1, agent2, agent3], + concurrent=True, + multithreaded=False, + ) + + # Create mock conversation + conversation = MagicMock() + mv.conversation = conversation + + # Create mock results + results = ["Paris", "Paris", "Lyon"] + + # Mock agent.run method + agent1.run.return_value = results[0] + agent2.run.return_value = results[1] + agent3.run.return_value = results[2] + + # Run majority voting + majority_vote = mv.run("What is the capital of France?") + + # Assert agent.run method was called with the correct task + agent1.run.assert_called_once_with( + "What is the capital of France?" + ) + agent2.run.assert_called_once_with( + "What is the capital of France?" + ) + agent3.run.assert_called_once_with( + "What is the capital of France?" + ) + + # Assert conversation.add method was called with the correct responses + conversation.add.assert_any_call(agent1.agent_name, results[0]) + conversation.add.assert_any_call(agent2.agent_name, results[1]) + conversation.add.assert_any_call(agent3.agent_name, results[2]) + + # Assert majority vote is correct + assert majority_vote is not None + + +def test_majority_voting_run_multithreaded(mocker): + # Create mock agents + agent1 = MagicMock(spec=Agent) + agent2 = MagicMock(spec=Agent) + agent3 = MagicMock(spec=Agent) + + # Create mock majority voting + mv = MajorityVoting( + agents=[agent1, agent2, agent3], + concurrent=False, + multithreaded=True, + ) + + # Create mock conversation + conversation = MagicMock() + mv.conversation = conversation + + # Create mock results + results = ["Paris", "Paris", "Lyon"] + + # Mock agent.run method + agent1.run.return_value = results[0] + agent2.run.return_value = results[1] + agent3.run.return_value = results[2] + + # Run majority voting + majority_vote = mv.run("What is the capital of France?") + + # Assert agent.run method was called with the correct task + agent1.run.assert_called_once_with( + "What is the capital of France?" + ) + agent2.run.assert_called_once_with( + "What is the capital of France?" + ) + agent3.run.assert_called_once_with( + "What is the capital of France?" + ) + + # Assert conversation.add method was called with the correct responses + conversation.add.assert_any_call(agent1.agent_name, results[0]) + conversation.add.assert_any_call(agent2.agent_name, results[1]) + conversation.add.assert_any_call(agent3.agent_name, results[2]) + + # Assert majority vote is correct + assert majority_vote is not None + + +@pytest.mark.asyncio +async def test_majority_voting_run_asynchronous(mocker): + # Create mock agents + agent1 = MagicMock(spec=Agent) + agent2 = MagicMock(spec=Agent) + agent3 = MagicMock(spec=Agent) + + # Create mock majority voting + mv = MajorityVoting( + agents=[agent1, agent2, agent3], + concurrent=False, + multithreaded=False, + asynchronous=True, + ) + + # Create mock conversation + conversation = MagicMock() + mv.conversation = conversation + + # Create mock results + results = ["Paris", "Paris", "Lyon"] + + # Mock agent.run method + agent1.run.return_value = results[0] + agent2.run.return_value = results[1] + agent3.run.return_value = results[2] + + # Run majority voting + majority_vote = await mv.run("What is the capital of France?") + + # Assert agent.run method was called with the correct task + agent1.run.assert_called_once_with( + "What is the capital of France?" + ) + agent2.run.assert_called_once_with( + "What is the capital of France?" + ) + agent3.run.assert_called_once_with( + "What is the capital of France?" + ) + + # Assert conversation.add method was called with the correct responses + conversation.add.assert_any_call(agent1.agent_name, results[0]) + conversation.add.assert_any_call(agent2.agent_name, results[1]) + conversation.add.assert_any_call(agent3.agent_name, results[2]) + + # Assert majority vote is correct + assert majority_vote is not None