diff --git a/examples/typedb_example.py b/examples/typedb_example.py new file mode 100644 index 00000000..8eacdc94 --- /dev/null +++ b/examples/typedb_example.py @@ -0,0 +1,106 @@ +from swarms.utils.typedb_wrapper import TypeDBWrapper, TypeDBConfig + +def main(): + # Initialize TypeDB wrapper with custom configuration + config = TypeDBConfig( + uri="localhost:1729", + database="swarms_example", + username="admin", + password="password" + ) + + # Define schema for a simple knowledge graph + schema = """ + define + person sub entity, + owns name: string, + owns age: long, + plays role; + + role sub entity, + owns title: string, + owns department: string; + + works_at sub relation, + relates person, + relates role; + """ + + # Example data insertion + insert_queries = [ + """ + insert + $p isa person, has name "John Doe", has age 30; + $r isa role, has title "Software Engineer", has department "Engineering"; + (person: $p, role: $r) isa works_at; + """, + """ + insert + $p isa person, has name "Jane Smith", has age 28; + $r isa role, has title "Data Scientist", has department "Data Science"; + (person: $p, role: $r) isa works_at; + """ + ] + + # Example queries + query_queries = [ + # Get all people + "match $p isa person; get;", + + # Get people in Engineering department + """ + match + $p isa person; + $r isa role, has department "Engineering"; + (person: $p, role: $r) isa works_at; + get $p; + """, + + # Get people with their roles + """ + match + $p isa person, has name $n; + $r isa role, has title $t; + (person: $p, role: $r) isa works_at; + get $n, $t; + """ + ] + + try: + with TypeDBWrapper(config) as db: + # Define schema + print("Defining schema...") + db.define_schema(schema) + + # Insert data + print("\nInserting data...") + for query in insert_queries: + db.insert_data(query) + + # Query data + print("\nQuerying data...") + for i, query in enumerate(query_queries, 1): + print(f"\nQuery {i}:") + results = db.query_data(query) + print(f"Results: {results}") + + # Example of deleting data + print("\nDeleting data...") + delete_query = """ + match + $p isa person, has name "John Doe"; + delete $p; + """ + db.delete_data(delete_query) + + # Verify deletion + print("\nVerifying deletion...") + verify_query = "match $p isa person, has name $n; get $n;" + results = db.query_data(verify_query) + print(f"Remaining people: {results}") + + except Exception as e: + print(f"Error: {e}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e2602db1..6b50d912 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,3 +29,6 @@ flake8-comprehensions>=3.12.0 flake8-simplify>=0.19.3 flake8-unused-arguments>=0.0.4 pyupgrade>=3.15.0 +typedb-client>=2.25.0 +typedb-protocol>=2.25.0 +typedb-driver>=2.25.0 diff --git a/swarms/utils/typedb_wrapper.py b/swarms/utils/typedb_wrapper.py new file mode 100644 index 00000000..8a0b5396 --- /dev/null +++ b/swarms/utils/typedb_wrapper.py @@ -0,0 +1,168 @@ +from typing import Dict, List, Optional, Any, Union +from loguru import logger +from typedb.client import TypeDB, SessionType, TransactionType +from typedb.api.connection.transaction import Transaction +from dataclasses import dataclass +import json + +@dataclass +class TypeDBConfig: + """Configuration for TypeDB connection.""" + uri: str = "localhost:1729" + database: str = "swarms" + username: Optional[str] = None + password: Optional[str] = None + timeout: int = 30 + +class TypeDBWrapper: + """ + A wrapper class for TypeDB that provides graph database operations for Swarms. + This class handles connection, schema management, and data operations. + """ + + def __init__(self, config: Optional[TypeDBConfig] = None): + """ + Initialize the TypeDB wrapper with the given configuration. + Args: + config (Optional[TypeDBConfig]): Configuration for TypeDB connection. + """ + self.config = config or TypeDBConfig() + self.client = None + self.session = None + self._connect() + + def _connect(self) -> None: + """Establish connection to TypeDB.""" + try: + self.client = TypeDB.core_client(self.config.uri) + if self.config.username and self.config.password: + self.session = self.client.session( + self.config.database, + SessionType.DATA, + self.config.username, + self.config.password + ) + else: + self.session = self.client.session( + self.config.database, + SessionType.DATA + ) + logger.info(f"Connected to TypeDB at {self.config.uri}") + except Exception as e: + logger.error(f"Failed to connect to TypeDB: {e}") + raise + + def _ensure_connection(self) -> None: + """Ensure connection is active, reconnect if necessary.""" + if not self.session or not self.session.is_open(): + self._connect() + + def define_schema(self, schema: str) -> None: + """ + Define the database schema. + Args: + schema (str): TypeQL schema definition. + """ + try: + with self.session.transaction(TransactionType.WRITE) as transaction: + transaction.query.define(schema) + transaction.commit() + logger.info("Schema defined successfully") + except Exception as e: + logger.error(f"Failed to define schema: {e}") + raise + + def insert_data(self, query: str) -> None: + """ + Insert data using TypeQL query. + Args: + query (str): TypeQL insert query. + """ + try: + with self.session.transaction(TransactionType.WRITE) as transaction: + transaction.query.insert(query) + transaction.commit() + logger.info("Data inserted successfully") + except Exception as e: + logger.error(f"Failed to insert data: {e}") + raise + + def query_data(self, query: str) -> List[Dict[str, Any]]: + """ + Query data using TypeQL query. + Args: + query (str): TypeQL query. + Returns: + List[Dict[str, Any]]: Query results. + """ + try: + with self.session.transaction(TransactionType.READ) as transaction: + result = transaction.query.get(query) + return [self._convert_concept_to_dict(concept) for concept in result] + except Exception as e: + logger.error(f"Failed to query data: {e}") + raise + + def _convert_concept_to_dict(self, concept: Any) -> Dict[str, Any]: + """ + Convert a TypeDB concept to a dictionary. + Args: + concept: TypeDB concept. + Returns: + Dict[str, Any]: Dictionary representation of the concept. + """ + try: + if hasattr(concept, "get_type"): + concept_type = concept.get_type() + if hasattr(concept, "get_value"): + return { + "type": concept_type.get_label_name(), + "value": concept.get_value() + } + elif hasattr(concept, "get_attributes"): + return { + "type": concept_type.get_label_name(), + "attributes": { + attr.get_type().get_label_name(): attr.get_value() + for attr in concept.get_attributes() + } + } + return {"type": "unknown", "value": str(concept)} + except Exception as e: + logger.error(f"Failed to convert concept to dict: {e}") + return {"type": "error", "value": str(e)} + + def delete_data(self, query: str) -> None: + """ + Delete data using TypeQL query. + Args: + query (str): TypeQL delete query. + """ + try: + with self.session.transaction(TransactionType.WRITE) as transaction: + transaction.query.delete(query) + transaction.commit() + logger.info("Data deleted successfully") + except Exception as e: + logger.error(f"Failed to delete data: {e}") + raise + + def close(self) -> None: + """Close the TypeDB connection.""" + try: + if self.session: + self.session.close() + if self.client: + self.client.close() + logger.info("TypeDB connection closed") + except Exception as e: + logger.error(f"Failed to close TypeDB connection: {e}") + raise + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.close() \ No newline at end of file diff --git a/tests/utils/test_typedb_wrapper.py b/tests/utils/test_typedb_wrapper.py new file mode 100644 index 00000000..076f5461 --- /dev/null +++ b/tests/utils/test_typedb_wrapper.py @@ -0,0 +1,129 @@ +import pytest +from unittest.mock import Mock, patch +from swarms.utils.typedb_wrapper import TypeDBWrapper, TypeDBConfig + +@pytest.fixture +def mock_typedb(): + """Mock TypeDB client and session.""" + with patch('swarms.utils.typedb_wrapper.TypeDB') as mock_typedb: + mock_client = Mock() + mock_session = Mock() + mock_typedb.core_client.return_value = mock_client + mock_client.session.return_value = mock_session + yield mock_typedb, mock_client, mock_session + +@pytest.fixture +def typedb_wrapper(mock_typedb): + """Create a TypeDBWrapper instance with mocked dependencies.""" + config = TypeDBConfig( + uri="test:1729", + database="test_db", + username="test_user", + password="test_pass" + ) + return TypeDBWrapper(config) + +def test_initialization(typedb_wrapper): + """Test TypeDBWrapper initialization.""" + assert typedb_wrapper.config.uri == "test:1729" + assert typedb_wrapper.config.database == "test_db" + assert typedb_wrapper.config.username == "test_user" + assert typedb_wrapper.config.password == "test_pass" + +def test_connect(typedb_wrapper, mock_typedb): + """Test connection to TypeDB.""" + mock_typedb, mock_client, mock_session = mock_typedb + typedb_wrapper._connect() + + mock_typedb.core_client.assert_called_once_with("test:1729") + mock_client.session.assert_called_once_with( + "test_db", + "DATA", + "test_user", + "test_pass" + ) + +def test_define_schema(typedb_wrapper, mock_typedb): + """Test schema definition.""" + mock_typedb, mock_client, mock_session = mock_typedb + schema = "define person sub entity;" + + with patch.object(typedb_wrapper.session, 'transaction') as mock_transaction: + mock_transaction.return_value.__enter__.return_value.query.define.return_value = None + typedb_wrapper.define_schema(schema) + + mock_transaction.assert_called_once_with("WRITE") + mock_transaction.return_value.__enter__.return_value.query.define.assert_called_once_with(schema) + +def test_insert_data(typedb_wrapper, mock_typedb): + """Test data insertion.""" + mock_typedb, mock_client, mock_session = mock_typedb + query = "insert $p isa person;" + + with patch.object(typedb_wrapper.session, 'transaction') as mock_transaction: + mock_transaction.return_value.__enter__.return_value.query.insert.return_value = None + typedb_wrapper.insert_data(query) + + mock_transaction.assert_called_once_with("WRITE") + mock_transaction.return_value.__enter__.return_value.query.insert.assert_called_once_with(query) + +def test_query_data(typedb_wrapper, mock_typedb): + """Test data querying.""" + mock_typedb, mock_client, mock_session = mock_typedb + query = "match $p isa person; get;" + mock_result = [Mock()] + + with patch.object(typedb_wrapper.session, 'transaction') as mock_transaction: + mock_transaction.return_value.__enter__.return_value.query.get.return_value = mock_result + result = typedb_wrapper.query_data(query) + + mock_transaction.assert_called_once_with("READ") + mock_transaction.return_value.__enter__.return_value.query.get.assert_called_once_with(query) + assert len(result) == 1 + +def test_delete_data(typedb_wrapper, mock_typedb): + """Test data deletion.""" + mock_typedb, mock_client, mock_session = mock_typedb + query = "match $p isa person; delete $p;" + + with patch.object(typedb_wrapper.session, 'transaction') as mock_transaction: + mock_transaction.return_value.__enter__.return_value.query.delete.return_value = None + typedb_wrapper.delete_data(query) + + mock_transaction.assert_called_once_with("WRITE") + mock_transaction.return_value.__enter__.return_value.query.delete.assert_called_once_with(query) + +def test_close(typedb_wrapper, mock_typedb): + """Test connection closing.""" + mock_typedb, mock_client, mock_session = mock_typedb + typedb_wrapper.close() + + mock_session.close.assert_called_once() + mock_client.close.assert_called_once() + +def test_context_manager(typedb_wrapper, mock_typedb): + """Test context manager functionality.""" + mock_typedb, mock_client, mock_session = mock_typedb + + with typedb_wrapper as db: + assert db == typedb_wrapper + + mock_session.close.assert_called_once() + mock_client.close.assert_called_once() + +def test_error_handling(typedb_wrapper, mock_typedb): + """Test error handling.""" + mock_typedb, mock_client, mock_session = mock_typedb + + # Test connection error + mock_typedb.core_client.side_effect = Exception("Connection failed") + with pytest.raises(Exception) as exc_info: + typedb_wrapper._connect() + assert "Connection failed" in str(exc_info.value) + + # Test query error + with patch.object(typedb_wrapper.session, 'transaction') as mock_transaction: + mock_transaction.return_value.__enter__.return_value.query.get.side_effect = Exception("Query failed") + with pytest.raises(Exception) as exc_info: + typedb_wrapper.query_data("test query") + assert "Query failed" in str(exc_info.value) \ No newline at end of file