Add TypeDB integration with wrapper and example

pull/803/head
ascender1729 3 weeks ago
parent c8d537c7ab
commit e397b8b19e

@ -0,0 +1,106 @@
from swarms.utils.typedb_wrapper import TypeDBWrapper, TypeDBConfig
def main():
# Initialize TypeDB wrapper with custom configuration
config = TypeDBConfig(
uri="localhost:1729",
database="swarms_example",
username="admin",
password="password"
)
# Define schema for a simple knowledge graph
schema = """
define
person sub entity,
owns name: string,
owns age: long,
plays role;
role sub entity,
owns title: string,
owns department: string;
works_at sub relation,
relates person,
relates role;
"""
# Example data insertion
insert_queries = [
"""
insert
$p isa person, has name "John Doe", has age 30;
$r isa role, has title "Software Engineer", has department "Engineering";
(person: $p, role: $r) isa works_at;
""",
"""
insert
$p isa person, has name "Jane Smith", has age 28;
$r isa role, has title "Data Scientist", has department "Data Science";
(person: $p, role: $r) isa works_at;
"""
]
# Example queries
query_queries = [
# Get all people
"match $p isa person; get;",
# Get people in Engineering department
"""
match
$p isa person;
$r isa role, has department "Engineering";
(person: $p, role: $r) isa works_at;
get $p;
""",
# Get people with their roles
"""
match
$p isa person, has name $n;
$r isa role, has title $t;
(person: $p, role: $r) isa works_at;
get $n, $t;
"""
]
try:
with TypeDBWrapper(config) as db:
# Define schema
print("Defining schema...")
db.define_schema(schema)
# Insert data
print("\nInserting data...")
for query in insert_queries:
db.insert_data(query)
# Query data
print("\nQuerying data...")
for i, query in enumerate(query_queries, 1):
print(f"\nQuery {i}:")
results = db.query_data(query)
print(f"Results: {results}")
# Example of deleting data
print("\nDeleting data...")
delete_query = """
match
$p isa person, has name "John Doe";
delete $p;
"""
db.delete_data(delete_query)
# Verify deletion
print("\nVerifying deletion...")
verify_query = "match $p isa person, has name $n; get $n;"
results = db.query_data(verify_query)
print(f"Remaining people: {results}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()

@ -29,3 +29,6 @@ flake8-comprehensions>=3.12.0
flake8-simplify>=0.19.3 flake8-simplify>=0.19.3
flake8-unused-arguments>=0.0.4 flake8-unused-arguments>=0.0.4
pyupgrade>=3.15.0 pyupgrade>=3.15.0
typedb-client>=2.25.0
typedb-protocol>=2.25.0
typedb-driver>=2.25.0

@ -0,0 +1,168 @@
from typing import Dict, List, Optional, Any, Union
from loguru import logger
from typedb.client import TypeDB, SessionType, TransactionType
from typedb.api.connection.transaction import Transaction
from dataclasses import dataclass
import json
@dataclass
class TypeDBConfig:
"""Configuration for TypeDB connection."""
uri: str = "localhost:1729"
database: str = "swarms"
username: Optional[str] = None
password: Optional[str] = None
timeout: int = 30
class TypeDBWrapper:
"""
A wrapper class for TypeDB that provides graph database operations for Swarms.
This class handles connection, schema management, and data operations.
"""
def __init__(self, config: Optional[TypeDBConfig] = None):
"""
Initialize the TypeDB wrapper with the given configuration.
Args:
config (Optional[TypeDBConfig]): Configuration for TypeDB connection.
"""
self.config = config or TypeDBConfig()
self.client = None
self.session = None
self._connect()
def _connect(self) -> None:
"""Establish connection to TypeDB."""
try:
self.client = TypeDB.core_client(self.config.uri)
if self.config.username and self.config.password:
self.session = self.client.session(
self.config.database,
SessionType.DATA,
self.config.username,
self.config.password
)
else:
self.session = self.client.session(
self.config.database,
SessionType.DATA
)
logger.info(f"Connected to TypeDB at {self.config.uri}")
except Exception as e:
logger.error(f"Failed to connect to TypeDB: {e}")
raise
def _ensure_connection(self) -> None:
"""Ensure connection is active, reconnect if necessary."""
if not self.session or not self.session.is_open():
self._connect()
def define_schema(self, schema: str) -> None:
"""
Define the database schema.
Args:
schema (str): TypeQL schema definition.
"""
try:
with self.session.transaction(TransactionType.WRITE) as transaction:
transaction.query.define(schema)
transaction.commit()
logger.info("Schema defined successfully")
except Exception as e:
logger.error(f"Failed to define schema: {e}")
raise
def insert_data(self, query: str) -> None:
"""
Insert data using TypeQL query.
Args:
query (str): TypeQL insert query.
"""
try:
with self.session.transaction(TransactionType.WRITE) as transaction:
transaction.query.insert(query)
transaction.commit()
logger.info("Data inserted successfully")
except Exception as e:
logger.error(f"Failed to insert data: {e}")
raise
def query_data(self, query: str) -> List[Dict[str, Any]]:
"""
Query data using TypeQL query.
Args:
query (str): TypeQL query.
Returns:
List[Dict[str, Any]]: Query results.
"""
try:
with self.session.transaction(TransactionType.READ) as transaction:
result = transaction.query.get(query)
return [self._convert_concept_to_dict(concept) for concept in result]
except Exception as e:
logger.error(f"Failed to query data: {e}")
raise
def _convert_concept_to_dict(self, concept: Any) -> Dict[str, Any]:
"""
Convert a TypeDB concept to a dictionary.
Args:
concept: TypeDB concept.
Returns:
Dict[str, Any]: Dictionary representation of the concept.
"""
try:
if hasattr(concept, "get_type"):
concept_type = concept.get_type()
if hasattr(concept, "get_value"):
return {
"type": concept_type.get_label_name(),
"value": concept.get_value()
}
elif hasattr(concept, "get_attributes"):
return {
"type": concept_type.get_label_name(),
"attributes": {
attr.get_type().get_label_name(): attr.get_value()
for attr in concept.get_attributes()
}
}
return {"type": "unknown", "value": str(concept)}
except Exception as e:
logger.error(f"Failed to convert concept to dict: {e}")
return {"type": "error", "value": str(e)}
def delete_data(self, query: str) -> None:
"""
Delete data using TypeQL query.
Args:
query (str): TypeQL delete query.
"""
try:
with self.session.transaction(TransactionType.WRITE) as transaction:
transaction.query.delete(query)
transaction.commit()
logger.info("Data deleted successfully")
except Exception as e:
logger.error(f"Failed to delete data: {e}")
raise
def close(self) -> None:
"""Close the TypeDB connection."""
try:
if self.session:
self.session.close()
if self.client:
self.client.close()
logger.info("TypeDB connection closed")
except Exception as e:
logger.error(f"Failed to close TypeDB connection: {e}")
raise
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close()

@ -0,0 +1,129 @@
import pytest
from unittest.mock import Mock, patch
from swarms.utils.typedb_wrapper import TypeDBWrapper, TypeDBConfig
@pytest.fixture
def mock_typedb():
"""Mock TypeDB client and session."""
with patch('swarms.utils.typedb_wrapper.TypeDB') as mock_typedb:
mock_client = Mock()
mock_session = Mock()
mock_typedb.core_client.return_value = mock_client
mock_client.session.return_value = mock_session
yield mock_typedb, mock_client, mock_session
@pytest.fixture
def typedb_wrapper(mock_typedb):
"""Create a TypeDBWrapper instance with mocked dependencies."""
config = TypeDBConfig(
uri="test:1729",
database="test_db",
username="test_user",
password="test_pass"
)
return TypeDBWrapper(config)
def test_initialization(typedb_wrapper):
"""Test TypeDBWrapper initialization."""
assert typedb_wrapper.config.uri == "test:1729"
assert typedb_wrapper.config.database == "test_db"
assert typedb_wrapper.config.username == "test_user"
assert typedb_wrapper.config.password == "test_pass"
def test_connect(typedb_wrapper, mock_typedb):
"""Test connection to TypeDB."""
mock_typedb, mock_client, mock_session = mock_typedb
typedb_wrapper._connect()
mock_typedb.core_client.assert_called_once_with("test:1729")
mock_client.session.assert_called_once_with(
"test_db",
"DATA",
"test_user",
"test_pass"
)
def test_define_schema(typedb_wrapper, mock_typedb):
"""Test schema definition."""
mock_typedb, mock_client, mock_session = mock_typedb
schema = "define person sub entity;"
with patch.object(typedb_wrapper.session, 'transaction') as mock_transaction:
mock_transaction.return_value.__enter__.return_value.query.define.return_value = None
typedb_wrapper.define_schema(schema)
mock_transaction.assert_called_once_with("WRITE")
mock_transaction.return_value.__enter__.return_value.query.define.assert_called_once_with(schema)
def test_insert_data(typedb_wrapper, mock_typedb):
"""Test data insertion."""
mock_typedb, mock_client, mock_session = mock_typedb
query = "insert $p isa person;"
with patch.object(typedb_wrapper.session, 'transaction') as mock_transaction:
mock_transaction.return_value.__enter__.return_value.query.insert.return_value = None
typedb_wrapper.insert_data(query)
mock_transaction.assert_called_once_with("WRITE")
mock_transaction.return_value.__enter__.return_value.query.insert.assert_called_once_with(query)
def test_query_data(typedb_wrapper, mock_typedb):
"""Test data querying."""
mock_typedb, mock_client, mock_session = mock_typedb
query = "match $p isa person; get;"
mock_result = [Mock()]
with patch.object(typedb_wrapper.session, 'transaction') as mock_transaction:
mock_transaction.return_value.__enter__.return_value.query.get.return_value = mock_result
result = typedb_wrapper.query_data(query)
mock_transaction.assert_called_once_with("READ")
mock_transaction.return_value.__enter__.return_value.query.get.assert_called_once_with(query)
assert len(result) == 1
def test_delete_data(typedb_wrapper, mock_typedb):
"""Test data deletion."""
mock_typedb, mock_client, mock_session = mock_typedb
query = "match $p isa person; delete $p;"
with patch.object(typedb_wrapper.session, 'transaction') as mock_transaction:
mock_transaction.return_value.__enter__.return_value.query.delete.return_value = None
typedb_wrapper.delete_data(query)
mock_transaction.assert_called_once_with("WRITE")
mock_transaction.return_value.__enter__.return_value.query.delete.assert_called_once_with(query)
def test_close(typedb_wrapper, mock_typedb):
"""Test connection closing."""
mock_typedb, mock_client, mock_session = mock_typedb
typedb_wrapper.close()
mock_session.close.assert_called_once()
mock_client.close.assert_called_once()
def test_context_manager(typedb_wrapper, mock_typedb):
"""Test context manager functionality."""
mock_typedb, mock_client, mock_session = mock_typedb
with typedb_wrapper as db:
assert db == typedb_wrapper
mock_session.close.assert_called_once()
mock_client.close.assert_called_once()
def test_error_handling(typedb_wrapper, mock_typedb):
"""Test error handling."""
mock_typedb, mock_client, mock_session = mock_typedb
# Test connection error
mock_typedb.core_client.side_effect = Exception("Connection failed")
with pytest.raises(Exception) as exc_info:
typedb_wrapper._connect()
assert "Connection failed" in str(exc_info.value)
# Test query error
with patch.object(typedb_wrapper.session, 'transaction') as mock_transaction:
mock_transaction.return_value.__enter__.return_value.query.get.side_effect = Exception("Query failed")
with pytest.raises(Exception) as exc_info:
typedb_wrapper.query_data("test query")
assert "Query failed" in str(exc_info.value)
Loading…
Cancel
Save